Orchestrate Data
Most data pipelines aren't run just once, but over and over again, to make sure additions and changes in the source eventually make their way to the destination.
To help you realize this, Meltano supports scheduled pipelines that can be orchestrated using Apache Airflow.
When a new pipeline schedule is created using the CLI, a DAG is automatically created in Airflow as well, which represents "a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies".
Create a Schedule
Scheduling predefined jobs
To regularly schedule your pipeline to run first define it as a job within your project.
Then you can schedule it using the meltano schedule add
command:
# Define a job
meltano job add tap-gitlab-to-target-postgres-with-dbt --tasks "tap-gitlab target-postgres dbt-postgres:run"
# Schedule the job
meltano schedule add daily-gitlab-load --job tap-gitlab-to-target-postgres-with-dbt --interval '@daily'
This would add the following schedule to your meltano.yml
:
schedules:
- name: daily-gitlab-load
interval: '@daily'
job: tap-gitlab-to-target-postgres-with-dbt
If you have schedule-specific environment variables that you would like to pass to the invocation environments of the plugins run by the schedule, you can supply those via the env
key like so:
schedules:
- name: daily-gitlab-load
interval: '@daily'
job: tap-gitlab-to-target-postgres-with-dbt
env:
SCHEDULE_SPECIFIC_ENV_VAR: schedule_specific_value
Installing Airflow
While you can use Meltano's CLI define pipeline schedules, actually executing them is the orchestrator's responsibility, so let's install Airflow:
Change directories so that you are inside your Meltano project,
and then run the following command to add the
default DAG generator
to your project and make Airflow available to use via meltano invoke
:
meltano add utility airflow
meltano invoke airflow:initialize
meltano invoke airflow users create -u admin@localhost -p password --role Admin -e admin@localhost -f admin -l admin
See the Airflow docs page on MeltanoHub for more details.
Using an existing Airflow installation
You can also use the Meltano DAG generator
with an existing Airflow installation, as long as the MELTANO_PROJECT_ROOT
environment variable is set to point at your Meltano project.
In fact, all meltano invoke airflow ...
does is populate MELTANO_PROJECT_ROOT
,
set Airflow's core.dags_folder
setting to $MELTANO_PROJECT_ROOT/orchestrate/dags
(where the DAG generator lives by default),
and invoke the airflow
executable with the provided arguments.
You can add the Meltano DAG generator to your project without also installing the Airflow orchestrator plugin by adding the airflow
file bundle:
meltano add files files-airflow
Now, you'll want to copy the DAG generator in to your Airflow installation's dags_folder
,
or reconfigure it to look in your project's orchestrate/dags
directory instead.
This setup assumes you'll use meltano schedule
to schedule your meltano elt
pipelines, as described above, since the DAG generator iterates over the result of
meltano schedule list --format=json
and creates DAGs for each.
However, you can also create your own Airflow DAGs for any pipeline you fancy
by using BashOperator
with the meltano elt
command, or
DockerOperator
with a project-specific Docker image.
Starting the Airflow scheduler
Now that Airflow is installed and (automatically) configured to look at your project's Meltano DAG generator, let's start the scheduler:
meltano invoke airflow scheduler
Airflow will now run your pipelines on a schedule as long as the scheduler is running!
Using Airflow directly
You are free to interact with Airflow directly through its own UI. You can start the web like this:
meltano invoke airflow webserver
By default, you'll only see Meltano's pipeline DAGs here, which are created automatically using the dynamic DAG generator included with every Meltano project, located at orchestrate/dags/meltano.py
.
You can use the bundled Airflow with custom DAGs by putting them inside the orchestrate/dags
directory, where they'll be picked up by Airflow automatically. To learn more, check out the Apache Airflow documentation.
Meltano's use of Airflow will be unaffected by other usage of Airflow as long as orchestrate/dags/meltano.py
remains untouched and pipelines are managed through the dedicated interface.
Other things you can do with Airflow
Currently, meltano invoke
gives you raw access to the underlying plugin after any configuration hooks.
View 'meltano' dags:
meltano invoke airflow dags list
Manually trigger a task to run:
meltano invoke airflow tasks run --raw meltano extract_load $(date -I)
Start the Airflow UI: (will start in a separate browser)
meltano invoke airflow webserver
Start the Airflow scheduler, enabling job processing:
meltano invoke airflow scheduler
Trigger a dag run:
meltano invoke airflow dags trigger meltano
Airflow is a full-featured orchestrator that has a lot of features that are currently outside of Meltano's scope. As we are improving this integration, Meltano will facade more of these feature to create a seamless experience using this orchestrator. Please refer to the Airflow documentation for more in-depth knowledge about Airflow.