Skip to main content

Orchestrate Data

Most data pipelines aren't run just once, but over and over again, to make sure additions and changes in the source eventually make their way to the destination.

To help you realize this, Meltano supports scheduled pipelines that can be orchestrated using Apache Airflow.

When a new pipeline schedule is created using the CLI, a DAG is automatically created in Airflow as well, which represents "a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies".

Create a Schedule

Scheduling predefined jobs

To regularly schedule your pipeline to run first define it as a job within your project. Then you can schedule it using the meltano schedule add command:

# Define a job
meltano job add tap-gitlab-to-target-postgres-with-dbt --tasks "tap-gitlab target-postgres dbt-postgres:run"

# Schedule the job
meltano schedule add daily-gitlab-load --job tap-gitlab-to-target-postgres-with-dbt --interval '@daily'

This would add the following schedule to your meltano.yml:

schedules:
- name: daily-gitlab-load
interval: '@daily'
job: tap-gitlab-to-target-postgres-with-dbt

If you have schedule-specific environment variables that you would like to pass to the invocation environments of the plugins run by the schedule, you can supply those via the env key like so:

schedules:
- name: daily-gitlab-load
interval: '@daily'
job: tap-gitlab-to-target-postgres-with-dbt
env:
SCHEDULE_SPECIFIC_ENV_VAR: schedule_specific_value

Installing Airflow

While you can use Meltano's CLI define pipeline schedules, actually executing them is the orchestrator's responsibility, so let's install Airflow:

Change directories so that you are inside your Meltano project, and then run the following command to add the default DAG generator to your project and make Airflow available to use via meltano invoke:

meltano add utility airflow
meltano invoke airflow:initialize
meltano invoke airflow users create -u admin@localhost -p password --role Admin -e admin@localhost -f admin -l admin

See the Airflow docs page on MeltanoHub for more details.

Using an existing Airflow installation

You can also use the Meltano DAG generator with an existing Airflow installation, as long as the MELTANO_PROJECT_ROOT environment variable is set to point at your Meltano project.

In fact, all meltano invoke airflow ... does is populate MELTANO_PROJECT_ROOT, set Airflow's core.dags_folder setting to $MELTANO_PROJECT_ROOT/orchestrate/dags (where the DAG generator lives by default), and invoke the airflow executable with the provided arguments.

You can add the Meltano DAG generator to your project without also installing the Airflow orchestrator plugin by adding the airflow file bundle:

meltano add files files-airflow

Now, you'll want to copy the DAG generator in to your Airflow installation's dags_folder, or reconfigure it to look in your project's orchestrate/dags directory instead.

This setup assumes you'll use meltano schedule to schedule your meltano elt pipelines, as described above, since the DAG generator iterates over the result of meltano schedule list --format=json and creates DAGs for each. However, you can also create your own Airflow DAGs for any pipeline you fancy by using BashOperator with the meltano elt command, or DockerOperator with a project-specific Docker image.

Starting the Airflow scheduler

Now that Airflow is installed and (automatically) configured to look at your project's Meltano DAG generator, let's start the scheduler:

meltano invoke airflow scheduler

Airflow will now run your pipelines on a schedule as long as the scheduler is running!

Using Airflow directly

You are free to interact with Airflow directly through its own UI. You can start the web like this:

meltano invoke airflow webserver

By default, you'll only see Meltano's pipeline DAGs here, which are created automatically using the dynamic DAG generator included with every Meltano project, located at orchestrate/dags/meltano.py.

You can use the bundled Airflow with custom DAGs by putting them inside the orchestrate/dags directory, where they'll be picked up by Airflow automatically. To learn more, check out the Apache Airflow documentation.

Meltano's use of Airflow will be unaffected by other usage of Airflow as long as orchestrate/dags/meltano.py remains untouched and pipelines are managed through the dedicated interface.

Other things you can do with Airflow

Currently, meltano invoke gives you raw access to the underlying plugin after any configuration hooks.

View 'meltano' dags:

meltano invoke airflow dags list

Manually trigger a task to run:

meltano invoke airflow tasks run --raw meltano extract_load $(date -I)

Start the Airflow UI: (will start in a separate browser)

meltano invoke airflow webserver

Start the Airflow scheduler, enabling job processing:

meltano invoke airflow scheduler

Trigger a dag run:

meltano invoke airflow dags trigger meltano

Airflow is a full-featured orchestrator that has a lot of features that are currently outside of Meltano's scope. As we are improving this integration, Meltano will facade more of these feature to create a seamless experience using this orchestrator. Please refer to the Airflow documentation for more in-depth knowledge about Airflow.