Skip to main content

AutoIngestion Orchestrator

TypePre-Set
Image$DATAOPS_AUTOINGESTION_RUNNER_IMAGE
Feature Status
Deprecated

This orchestrator is deprecated and will be removed in the next version.

The DataOps AutoIngestion Orchestrator is responsible for carrying out any data ingestion from external databases into Snowflake that is ingested directly by DataOps. This orchestrator is used alongside orchestrators like the Stitch and Talend orchestrators, that also ingest data and are still orchestrated by the DataOps platform.

Usage

This orchestrator takes the model configuration in /dataops/auto_ingestion/ and runs an ingestion job for each subfolder, where each subfolder is a separate source->target pair. Consequently, the configuration for an autoingestion-orchestrator is split into two parts:

  • the tap and target configuration
  • the pipeline configuration

Ingestion Types

There are several ingestion types supported by the DataOps AutoIngestion Orchestrator. Each ingestion type is set (where supported) per ingestion element: a Table/View in the case of a relational database or a file in the case of file ingestion.

The available ingestion types are as follows:

1. Full Table Ingestion

This ingestion type is sometimes called Truncate and Load. Every time this ingestion type is run, the source is read in full, and the target table is recreated from scratch.

2. Key-Based Incremental Ingestion

This ingestion type uses high-water mark keys (e.g., amended_time) in the source system to determine any rows that have been added/changed to replicate these changes. Schema changes are handled, and deleted rows are not captured.

3. Log-Based Incremental Ingestion

This ingestion type uses replication logs or similar in the source to determine whether any rows have been added/changed/deleted and replicates these as well as handling all schema changes.

4. Hybrid Ingestion

In line with the principles of robustness, we recommend that, if incremental methods are being used for regular ingestion, you should do a periodic full sync (i.e., trigger a Full Table ingestion) to the baseline. This ensures that nothing has been missed and picks up deleted rows in the case of Key-Based Incremental Ingestion. For example, if an incremental method runs for a table every hour, a full sync may be triggered as part of a weekly pipeline.

5. Tipping Point Ingestion

For some sources, there is no choice but to run incremental loads (e.g., a table with 1 billion rows and 1 million rows added each day). However, with the efficiency of the DataOps Auto Ingestion Engine and Snowflake, the point at which it makes sense to switch from Full Table to an incremental ingestion method is a lot higher than most people think. There are many situations where a daily load of an one million-row table is a perfectly sensible approach.

Supported Sources

SourceFull Table SupportedKey-Based Incremental SupportedLog-Based Incremental Supported
MySQLYYY
MSSQLYYY
PostgresYYY
FileYNN

Supported Parameters

ParameterRequired/DefaultDescription
DATABASEREQUIREDUsually calculated automatically by the pipeline
INGEST_ACTIONREQUIREDMust be either RUN, VALIDATE, or FORCE_RESYNC
INGESTION_JOBREQUIREDMust be the name of a valid directory, including the appropriate configuration files
DEBUGOPTIONAL, defaults to falseSet to true (or any value) to enable further logging
INGEST_PROJECT_PATHOPTIONAL, defaults to $CI_PROJECT_DIR/auto_ingestionThe location contains a set of subdirectories, one for each possible INGESTION job
INGESTION_TARGETOPTIONAL, defaults to SnowflakeOnly Snowflake is supported initially
INGESTION_LARGE_FILE_BYTESOPTIONAL, defaults to 10000000 (~10M)This only applies to file ingestion. It represents the size a file must be in order to be handled as a 'large file'

AutoIngestion Persistent Cache

Rather than starting afresh, you must save the last ingestion state for incremental runs to resume the ingestion(from the last saved state). This is achieved by saving the bookmark or state of the ingestion.

Bookmarks are generated during the ingestion run to mark specific points in the incremental ingestion. These bookmarks are shared between all pipeline runs in the same environment so that new pipelines can continue the ingestion process rather than starting afresh.

When a new environment (branch/project) is set up, Full Table ingestion takes place whether the table is marked for incremental or Full Table ingestion. This replication ingests all existing data(from start to finish).

Suppose an ingestion job was marked for incremental ingestion (Key-Based or Log-Based ingestion). In this case, a state or bookmark is generated to be used as a starting point for subsequent ingestion jobs.

danger

In DataOps, ingestion jobs cannot be run concurrently in any given environment.

Ingestion jobs create lock files to denote that another instance of the same job is running. If a pipeline is started, resulting in two instances of the same ingestion job, an invalid ingestion (or other) error can occur.

Different environments can run the same ingestion concurrently as a persistent cache is stored on a project and environment basis.

Example Jobs

The following examples demonstrate four different use cases for this orchestrator:

1. Tap and Target Configuration

A tap file (the source) and a target file (Snowflake) must be defined for each source you want to ingest. The DataOps AutoIngestion Orchestrator will execute each tap and target pair to ingest the data at runtime.

/dataops/
auto_ingestion/
mysql-transaction-data/
config/
tap_mysql.template
target_snowflake.template
mssql-customer-data/
config/
tap_mssql.template
target_snowflake.template
more_sources_etc/

In the tap and target configuration, essential details like connecting to the source database, which schema to use, and which schema name to use in the target database are defined. It does not include passwords and other sensitive credentials. These are replaced with {{ config }} tags and only loaded at runtime from the password vault.

2. Tap Configuration

id: "mssql-rds-adventureworks-full-load-humanresources"     # Unique identifier of the tap
name: "mssql-rds-adventureworks-full-load-humanresources" # Name of the tap
type: "tap-mssql" # !! THIS SHOULD NOT CHANGE !!
owner: "support@dataops.live" # Data owner to contact
db_conn:
host: "oursourcedb.eu-west-1.rds.amazonaws.com" # MsSQL server
port: 1433 # MsSQL port
user: "{{ SOURCE.MSSQL_RDS.USERNAME }}" # MsSQL user
password: "{{ SOURCE.MSSQL_RDS.PASSWORD }}" # Plain string or vault encrypted
database: "adventureworks_humanresources" # MsSQL database
target: "snowflake" # ID of the target connector where the data will be loaded
batch_size_rows: 20000 # Batch size for the stream to optimise load performance
#hard_delete: False
schemas:

- source_schema: "HumanResources" # Source schema in MsSQL with tables
target_schema: "aw_humanresources" # Target schema in the destination Data Warehouse
target_schema_select_permissions: # Optional: Grant SELECT on schema and tables that created
- "osmium_runner_role"

tables:
- table_name: "Department"
replication_method: "FULL_TABLE"
- table_name: "Employee"
replication_method: "FULL_TABLE"
- table_name: "EmployeeDepartmentHistory"
replication_method: "FULL_TABLE"

3. Target Configuration

# ------------------------------------------------------------------------------
# General Properties
# ------------------------------------------------------------------------------
id: "snowflake" # Unique identifier of the target
name: "Snowflake" # Name of the target
type: "target-snowflake" # !! THIS SHOULD NOT CHANGE !!


# ------------------------------------------------------------------------------
# Target - Data Warehouse connection details
# ------------------------------------------------------------------------------
db_conn:

account: "{{ SNOWFLAKE.ACCOUNT }}" # Snowflake account
dbname: "{{ ENV.DATABASE }}" # Snowflake database name
user: "{{ SNOWFLAKE.INGESTION.USERNAME }}" # Snowflake user
password: "{{ SNOWFLAKE.INGESTION.PASSWORD }}" # Plain string or vault encrypted
warehouse: "{{ SNOWFLAKE.INGESTION.WAREHOUSE }}" # Snowflake virtual warehouse
default_target_schema: "TARGET_SCHEMA"

# We use an intermediate external stage on S3 to load data into Snowflake
aws_access_key_id: "{{ AWS.STAGING.ACCESS_KEY_ID }} "
aws_secret_access_key: "{{ AWS.STAGING.SECRET_ACCESS_KEY }}"

s3_bucket: "{{ AWS.STAGING.S3_BUCKET }}" # S3 external stbucket name
s3_key_prefix: "{{ AWS.STAGING.S3_PREFIX }}" # Optional: S3 key prefix

stage: "public.pipelinewise_stage"
file_format: "public.pipelinewise_csv_format"

4. Pipeline Configuration

agent_1_csv_batch_ingestion:
<<: *p1_sources
stage: batch_ingestion
image: $AUTOINGESTION:$CI_PIPELINE_ID
tags:
- dataops-runner-build-agent-docker
variables:
INGEST_ACTION: RUN
INGESTION_JOB: csv_on_s3
script:
- /dataops
info

This script expects to find a directory in the platform repository.

Authentication

Key Pair Authentication

AutoIngestion orchestrator does not support key pair based authentication. if DATAOPS_SNOWFLAKE_AUTH set to KEY_PAIR, autoingestion orchestrator job would fail with the error: Key Pair authentication is not supported in autoingestion orchestrator, update credential accordingly.

Project Resources

None

Host Dependencies (and Resources)

None