Skip to main content

Replicate Data

Meltano lets you easily extract and load data from and to databases, SaaS APIs, and file formats using primarily Singer taps and targets, which take the role of your project's extractors and loaders.

Meltano manages your tap and target configuration for you, makes it easy to select which entities and attributes to extract, and keeps track of the incremental replication state, so that subsequent pipeline runs with the same state ID will always pick up right where the previous run left off.

You can run EL(T) pipelines using meltano run. If you encounter some trouble running a pipeline, read our troubleshooting tips for some errors commonly seen.

Plugin configuration

As described in the Configuration guide, meltano run will determine the configuration of your extractor + loader pairs and any optional utilities (e.g. dbt) by looking in the environment, your project's .env file, the system database, and finally your meltano.yml project file, falling back to a default value if nothing was found.

You can use meltano config <plugin> list to list all available settings with their names, environment variables, and current values. meltano config <plugin> will print the current configuration in JSON format.

If supported by the plugin type, its configuration can be tested using meltano config <plugin> test.

Extractor catalog generation

Many extractors (Singer taps) expect to be provided a catalog when they are run in sync mode using meltano run or meltano invoke. This catalog is a JSON file describing the schemas of the available entities (streams, tables) and attributes (properties, columns), along with metadata to indicate (among other things) which entities and attributes should (or should not) be extracted.

A catalog can be generated by running the extractor in discovery mode and making the desired modifications to the schemas and metadata for the discovered entities and attributes. Because these catalog files can be very large and can get outdated as data sources evolve, this process can be tedious and error-prone.

To save you a headache, Meltano can handle catalog generation for you, by letting you describe your desired modifications using entity selection, metadata, and schema rules that can be configured like any other setting, and are applied to the discovered catalog on the fly when the extractor is run using meltano run or meltano invoke.

If you'd like to manually inspect the generated catalog for debugging purposes, you can dump it to STDOUT or a file using the --dump=catalog option on meltano invoke or meltano el.

Note that if you've already manually discovered a catalog and modified it to your liking, it can be provided explicitly using meltano el's --catalog option or the catalog extractor extra.

In some cases, like when a tap has extra commmand line options like --about or --version that don't need a catalog, or when you only need to dump the tap configuration with --dump=config, Meltano avoids running discovery to save the overhead and to not require the tap to be configured with valid credentials.

Selecting entities and attributes for extraction

Extractors are often capable of extracting many more entities and attributes than your use case may require. To save on bandwidth and storage, it's usually a good idea to instruct your extractor to only select those entities and attributes you actually plan on using.

Meltano makes it easy to select specific entities and attributes for inclusion or exclusion using meltano select and the select extractor extra, which let you specify inclusion and exclusion rules that can contain Unix shell-style wildcards to match multiple entities and/or attributes at once.

Note that exclusion takes precedence over inclusion: if an entity or attribute is matched by an exclusion pattern, there is no way to get it back using an inclusion pattern unless the exclusion pattern is manually removed from your meltano.yml project file first.

If no rules are defined using meltano select, Meltano will fall back on catch-all rule *.* so that all entities and attributes are selected.

meltano select <plugin> --list --all

# For example:
meltano select tap-gitlab --list --all
meltano select <plugin> <entity> <attribute>
meltano select <plugin> --exclude <entity> <attribute>

# For example:
meltano select tap-gitlab commits id
meltano select tap-gitlab commits project_id
meltano select tap-gitlab commits created_at
meltano select tap-gitlab commits author_name
meltano select tap-gitlab commits message

# Include all attributes of an entity
meltano select tap-gitlab tags "*"

# Exclude matching attributes of all entities
meltano select tap-gitlab --exclude "*" "*_url"

This will add the selection rules to your meltano.yml project file:

        extractors:
- name: tap-gitlab
config:
projects: meltano/meltano meltano/tap-gitlab
start_date: "2022-03-01T00:00:00Z"
select:
- commits.id
- commits.project_id
- commits.created_at
- commits.author_name
- commits.message
- tags.*
- "!*.*_url"

Setting metadata

Additional Singer stream and property metadata (like replication-method and replication-key) can be specified using the metadata extractor extra, which can be treated like a _metadata setting with nested properties _metadata.<entity>.<key> and _metadata.<entity>.<attribute>.<key>.

meltano config <plugin> set _metadata <entity> replication-method <LOG_BASED|INCREMENTAL|FULL_TABLE>

# For example:
meltano config tap-postgres set _metadata some_entity_id replication-method INCREMENTAL
meltano config tap-postgres set _metadata other_entity replication-method FULL_TABLE

# Set replication-method metadata for all entities
meltano config tap-postgres set _metadata '*' replication-method INCREMENTAL

# Set replication-method metadata for matching entities
meltano config tap-postgres set _metadata '*_full' replication-method FULL_TABLE

This will add the metadata rules to your meltano.yml project file:

        extractors:
- name: tap-postgres
metadata:
some_entity_id:
replication-method: INCREMENTAL
replication-key: id
other_entity:
replication-method: FULL_TABLE
"*":
replication-method: INCREMENTAL
"*_full":
replication-method: FULL_TABLE

Overriding schemas

Similarly, a schema extractor extra is available that lets you easily override Singer stream schema descriptions. Here too, Unix shell-style wildcards can be used to match multiple entities and/or attributes at once.

Replication methods

Extractors can replicate data from a source using one of the following methods:

Extractors for SaaS APIs typically hard-code the appropriate replication method for each supported entity. Most database extractors, on the other hand, support two or more of these methods and require you to choose an appropriate option for each table through the replication-method stream metadata key.

To support incremental replication, where a data integration pipeline run picks up where the previous run left off, Meltano keeps track of incremental replication state.

Log-based Incremental Replication

The extractor uses the database's binary log files to identify what records were inserted, updated, and deleted from the table since the last run (if any), and extracts only these records.

This option is not supported by all databases and database extractors.

To learn more about how Log-based Incremental Replication works and its limitations, refer to the Stitch Docs, which by and large also apply to Singer taps used with Meltano.

Key-based Incremental Replication

The extractor uses the value of a specific column on the table (the Replication Key, e.g. an updated_at timestamp or incrementing id integer) to identify what records were inserted or updated (but not deleted) since the last run (if any), and extracts only those records.

To learn more about how Key-based Incremental Replication works and its limitations, refer to the Stitch Docs, which by and large also apply to Singer taps used with Meltano.

Replication Key

Replication Keys are columns that database extractors use to identify new and updated data for replication.

When you set a table to use Key-based Incremental Replication, you’ll also need to define a Replication Key for that table by setting the replication-key stream metadata key.

To learn more about replication keys, refer to the Stitch Docs, which by and large also apply to Singer taps used with Meltano.

Full Table Replication

The extractor extracts all available records in the table on every run.

To learn more about how Full-Table Replication works and its limitations, refer to the Stitch Docs, which by and large also apply to Singer taps used with Meltano.

Incremental replication state

Most extractors (Singer taps) generate state when they are run, that can be passed along with a subsequent invocation to have the extractor pick up where it left off the previous time (handled automatically for meltano run and with the --state-id argument for meltano elt).

Meltano stores this pipeline state in its state backend, identified by the meltano run State ID automatically generated based on the extractor name, loader name, and active environment name (see more about incremental state for elt).

If you'd like to manually inspect a job's state for debugging purposes, or so that you can store it somewhere other than the system database you can use the meltano state command to do things like list all states, get state by name, set state, etc.

Internal State Merge Logic

When running Extract and Load pipelines via run, Meltano will retrieve your pipeline state (see the state backends docs for details on where it's stored) for the most recently completed pipeline based on State ID. If a state record from a completed job is found, its data is passed along to the extractor.

If the most recent pipeline aborted before a particular stream completed, then it might have generated a partial state record. If one or more partial state records are found, the partial data is merged with the last completed state, to produce an up-to-date state artifact which will be passed along to the extractor. This allows your pipeline to pick up where it left off even if the last sync aborted midway through.

The same merge behavior is performed whenever a user runs meltano state get. This command returns the merged result of the latest completed state plus any newer partial state records, if they exist. This works as Singer Targets are expected to emit STATE messages only after persisting data for a given stream.

Partial state records can also be inserted manually via meltano state merge. Unlike meltano state merge,meltano state set will insert a complete record, which causes meltano to ignore any previous state records, whether completed or partial.

Not seeing state picked up after a failed run?

Some loaders only emit state once their work is completely done, even if some data may have been persisted already, and if earlier state messages from the extractor could have been forwarded to Meltano. When a pipeline with such a loader fails or is otherwise interrupted, no state will have been emitted yet, and a subsequent ELT run will not be able to pick up where this run actually left off.

Pipeline-specific configuration

If you'd like to specify (or override) the values of certain settings at runtime, on a per-pipeline basis, you can set them in the meltano run execution environment using environment variables.

This lets you use the same extractors and loaders in multiple pipelines, configured differently each time, as an alternative to creating multiple configurations using plugin inheritance.

On a shell, you can explicitly export environment variables, that will be passed along to every following command invocation, or you can specify them in-line with a specific invocation, ahead of the command:

export TAP_FOO_BAR=bar
export TAP_FOO_BAZ=baz

# For example overriding a start date
export TAP_GITHUB_START_DATE="2023-01-01"

meltano run ...

TAP_FOO_BAR=bar TAP_FOO_BAZ=baz meltano run ...

To verify that these environment variables will be picked up by Meltano as you intended, you can test them with meltano config <plugin> before running meltano run.

Running pipelines with elt

Originally the elt command was the only way to run pipelines but more recently the run command has become the recommended approach for most use cases due to it's flexibility. Although for some use cases using the elt command is still preferred because it has more fine grain control for extract + load pairs.

Incremental replication state (elt)

For meltano elt the State ID has to be created and set manually using the --state-id argument, make sure to use a unique string identifier for the pipeline always include it since it must be present in each execution in order for incremental replication to work.

Also note that if you already have a state file you'd like to use, it can be provided explicitly using meltano elt's --state option or the state extractor extra.

Pipeline-specific schedule configuration

If you're using the elt syntax with meltano schedule to schedule your pipelines, you can specify environment variables for each pipeline in your meltano.yml project file, where each entry in the schedules array can have an env dictionary:

schedules:
- name: foo-to-bar
extractor: tap-foo
loader: target-bar
transform: skip
interval: '@hourly'
env:
TAP_FOO_BAR: bar
TAP_FOO_BAZ: baz

Different runners and execution/orchestration platforms will have their own way of specifying environment variables along with a command invocation.

Airflow's BashOperator, for example, supports an env parameter:

BashOperator(
# ...
bash_command="meltano elt ...",
env={
"TAP_FOO_BAR": "bar",
"TAP_FOO_BAZ": "baz",
},
)

Pipeline environment variables

To allow loaders and transformers to adapt their configuration and behavior based on the extractor and loader they are run with, meltano elt dynamically sets a number of pipeline-specific environment variables before compiling their configuration and invoking their executables.

Extractor variables

In addition to variables available to all plugins, the following variables describing the extractor are available to loaders and transformers:

  • MELTANO_EXTRACTOR_NAME: the extractor's name, e.g. tap-gitlab
  • MELTANO_EXTRACTOR_NAMESPACE: the extractor's namespace, e.g. tap_gitlab
  • MELTANO_EXTRACT_<SETTING_NAME>: one environment variable for each of the extractor's settings and extras, e.g. MELTANO_EXTRACT_PRIVATE_TOKEN for the private_token setting, and MELTANO_EXTRACT__LOAD_SCHEMA for the load_schema extra
  • <SETTING_ENV>: all of the extractor's regular configuration environment variables, as listed by meltano config <plugin> list, e.g. TAP_GITLAB_API_URL for the api_url setting

Loader variables

Additionally, the following variables describing the loader are available to transformers:

  • MELTANO_LOADER_NAME: the loader's name, e.g. target-postgres
  • MELTANO_LOADER_NAMESPACE: the loader's namespace, e.g. postgres
  • MELTANO_LOAD_<SETTING_NAME>: one environment variable for each of the loader's settings and extras, e.g. MELTANO_LOAD_SCHEMA for the schema setting, and MELTANO_LOAD__DIALECT for the dialect extra
  • <SETTING_ENV>: all of the loader's regular configuration environment variables, as listed by meltano config <plugin> list, e.g. TARGET_POSTGRES_HOST for the host setting

Transform variables

Additionally, the following variables describing the transform are available to transformers:

  • MELTANO_TRANSFORM_NAME: the loader's name, e.g. tap-gitlab
  • MELTANO_TRANSFORM_NAMESPACE: the loader's namespace, e.g. tap_gitlab
  • MELTANO_TRANSFORM_<SETTING_NAME>: one environment variable for each of the transform's settings and extras, e.g. MELTANO_TRANSFORM__PACKAGE_NAME for the package_name extra

How to use

Inside your loader or transformer's config object in your meltano.yml project file, you can reference these (and other) environment variables as $VAR (as a single word) or ${VAR} (inside a word). Inside your plugin, you can reference them through os.environ as usual (assuming you're using Python).

This feature is used to dynamically configure the target-postgres and target-snowflake loaders and dbt transformer as appropriate, independent of the specific extractor and loader used:

  • Default value for the target-postgres and target-snowflake schema settings:
  • Default value for dbt's target setting:
    • $MELTANO_LOAD__DIALECT, e.g. postgres for target-postgres and snowflake for target-snowflake, which correspond to the target names in transform/profile/profiles.yml
  • Default value for dbt's models setting:
    • $MELTANO_TRANSFORM__PACKAGE_NAME$MELTANO_EXTRACTOR_NAMESPACE my_meltano_model, e.g. tap_gitlab tap_gitlab my_meltano_model for the tap-gitlab transform and tap-gitlab extractor

Troubleshooting

See the troubleshooting guide for more information to resolving any issues that arise.