AutoIngestion Orchestrator
Type | Pre-Set |
---|---|
Image | $DATAOPS_AUTOINGESTION_RUNNER_IMAGE |
Feature Status | EoL |
The AutoIngestion orchestrator is responsible for carrying out any data ingestion from external databases into Snowflake that is ingested directly by DataOps. This orchestrator is used alongside orchestrators like the Stitch and Talend orchestrators, that also ingest data and are still orchestrated by the DataOps.live data product platform.
Usage
This orchestrator takes the model configuration in /dataops/auto_ingestion/
and runs an ingestion job for each subfolder, where each subfolder is a separate source->target pair. Consequently, the configuration for an autoingestion-orchestrator is split into two parts:
- the tap and target configuration
- the pipeline configuration
Ingestion types
There are several ingestion types supported by the AutoIngestion orchestrator. Each ingestion type is set (where supported) per ingestion element: a Table/View in the case of a relational database or a file in the case of file ingestion.
The available ingestion types are as follows:
1. Full table ingestion
This ingestion type is sometimes called Truncate and Load. Every time this ingestion type is run, the source is read in full, and the target table is recreated from scratch.
2. Key-based incremental ingestion
This ingestion type uses high-water mark keys (e.g., amended_time) in the source system to determine any rows that have been added/changed to replicate these changes. Schema changes are handled, and deleted rows are not captured.
3. Log-based incremental ingestion
This ingestion type uses replication logs or similar in the source to determine whether any rows have been added/changed/deleted and replicates these as well as handling all schema changes.
4. Hybrid ingestion
In line with the principles of robustness, we recommend that, if incremental methods are being used for regular ingestion, you should do a periodic full sync (i.e., trigger a Full Table
ingestion) to the baseline. This ensures that nothing has been missed and picks up deleted rows in the case of Key-Based Incremental Ingestion. For example, if an incremental method runs for a table every hour, a full sync may be triggered as part of a weekly pipeline.
5. Tipping point ingestion
For some sources, there is no choice but to run incremental loads (e.g., a table with 1 billion rows and 1 million rows added each day). However, with the efficiency of the DataOps Auto Ingestion Engine and Snowflake, the point at which it makes sense to switch from Full Table to an incremental ingestion method is a lot higher than most people think. There are many situations where a daily load of an one million-row table is a perfectly sensible approach.
Supported sources
Source | Full Table Supported | Key-Based Incremental Supported | Log-Based Incremental Supported |
---|---|---|---|
MySQL | Y | Y | Y |
MSSQL | Y | Y | Y |
Postgres | Y | Y | Y |
File | Y | N | N |
Supported parameters
Parameter | Required/Default | Description |
---|---|---|
DATABASE | REQUIRED | Usually calculated automatically by the pipeline |
INGEST_ACTION | REQUIRED | Must be either RUN, VALIDATE, or FORCE_RESYNC |
INGESTION_JOB | REQUIRED | Must be the name of a valid directory, including the appropriate configuration files |
DEBUG | Optional. Defaults to false | Set to true (or any value) to enable further logging |
INGEST_PROJECT_PATH | Optional. Defaults to $CI_PROJECT_DIR/auto_ingestion | The location contains a set of subdirectories, one for each possible INGESTION job |
INGESTION_TARGET | Optional. Defaults to Snowflake | Only Snowflake is supported initially |
INGESTION_LARGE_FILE_BYTES | Optional. Defaults to 10000000 (~10M) | This only applies to file ingestion. It represents the size a file must be in order to be handled as a 'large file' |
AutoIngestion persistent cache
Rather than starting afresh, you must save the last ingestion state for incremental runs to resume the ingestion(from the last saved state). This is achieved by saving the bookmark or state of the ingestion.
Bookmarks are generated during the ingestion run to mark specific points in the incremental ingestion. These bookmarks are shared between all pipeline runs in the same environment so that new pipelines can continue the ingestion process rather than starting afresh.
When a new environment (branch/project) is set up, Full Table ingestion takes place whether the table is marked for incremental or Full Table ingestion. This replication ingests all existing data(from start to finish).
Suppose an ingestion job was marked for incremental ingestion (Key-Based or Log-Based ingestion). In this case, a state or bookmark is generated to be used as a starting point for subsequent ingestion jobs.
In DataOps, ingestion jobs cannot be run concurrently in any given environment.
Ingestion jobs create lock files to denote that another instance of the same job is running. If a pipeline is started, resulting in two instances of the same ingestion job, an invalid ingestion (or other) error can occur.
Different environments can run the same ingestion concurrently as a persistent cache is stored on a project and environment basis.
Example jobs
The following examples demonstrate four different use cases for this orchestrator:
1. Tap and target configuration
A tap file (the source) and a target file (Snowflake) must be defined for each source you want to ingest. The DataOps AutoIngestion orchestrator will execute each tap and target pair to ingest the data at runtime.
/dataops/
auto_ingestion/
mysql-transaction-data/
config/
tap_mysql.template
target_snowflake.template
mssql-customer-data/
config/
tap_mssql.template
target_snowflake.template
more_sources_etc/
In the tap and target configuration, essential details like connecting to the source database, which schema to use, and which schema name to use in the target database are defined. It does not include passwords and other sensitive credentials. These are replaced with {{ config }}
tags and only loaded at runtime from the password vault.
2. Tap configuration
id: "mssql-rds-adventureworks-full-load-humanresources" # Unique identifier of the tap
name: "mssql-rds-adventureworks-full-load-humanresources" # Name of the tap
type: "tap-mssql" # !! THIS SHOULD NOT CHANGE !!
owner: "support@dataops.live" # Data owner to contact
db_conn:
host: "oursourcedb.eu-west-1.rds.amazonaws.com" # MsSQL server
port: 1433 # MsSQL port
user: "{{ SOURCE.MSSQL_RDS.USERNAME }}" # MsSQL user
password: "{{ SOURCE.MSSQL_RDS.PASSWORD }}" # Plain string or vault encrypted
database: "adventureworks_humanresources" # MsSQL database
target: "snowflake" # ID of the target connector where the data will be loaded
batch_size_rows: 20000 # Batch size for the stream to optimise load performance
#hard_delete: False
schemas:
- source_schema: "HumanResources" # Source schema in MsSQL with tables
target_schema: "aw_humanresources" # Target schema in the destination Data Warehouse
target_schema_select_permissions: # Optional: Grant SELECT on schema and tables that created
- "osmium_runner_role"
tables:
- table_name: "Department"
replication_method: "FULL_TABLE"
- table_name: "Employee"
replication_method: "FULL_TABLE"
- table_name: "EmployeeDepartmentHistory"
replication_method: "FULL_TABLE"
3. Target configuration
# ------------------------------------------------------------------------------
# General Properties
# ------------------------------------------------------------------------------
id: "snowflake" # Unique identifier of the target
name: "Snowflake" # Name of the target
type: "target-snowflake" # !! THIS SHOULD NOT CHANGE !!
# ------------------------------------------------------------------------------
# Target - Data Warehouse connection details
# ------------------------------------------------------------------------------
db_conn:
account: "{{ SNOWFLAKE.ACCOUNT }}" # Snowflake account
dbname: "{{ ENV.DATABASE }}" # Snowflake database name
user: "{{ SNOWFLAKE.INGESTION.USERNAME }}" # Snowflake user
password: "{{ SNOWFLAKE.INGESTION.PASSWORD }}" # Plain string or vault encrypted
warehouse: "{{ SNOWFLAKE.INGESTION.WAREHOUSE }}" # Snowflake virtual warehouse
default_target_schema: "TARGET_SCHEMA"
# We use an intermediate external stage on S3 to load data into Snowflake
aws_access_key_id: "{{ AWS.STAGING.ACCESS_KEY_ID }} "
aws_secret_access_key: "{{ AWS.STAGING.SECRET_ACCESS_KEY }}"
s3_bucket: "{{ AWS.STAGING.S3_BUCKET }}" # S3 external stbucket name
s3_key_prefix: "{{ AWS.STAGING.S3_PREFIX }}" # Optional: S3 key prefix
stage: "public.pipelinewise_stage"
file_format: "public.pipelinewise_csv_format"
4. Pipeline configuration
agent_1_csv_batch_ingestion:
<<: *p1_sources
stage: batch_ingestion
image: $AUTOINGESTION:$CI_PIPELINE_ID
tags:
- dataops-runner-build-agent-docker
variables:
INGEST_ACTION: RUN
INGESTION_JOB: csv_on_s3
script:
- /dataops
This script expects to find a directory in the data product platform repository.
Authentication
The AutoIngestion orchestrator does not support key pair-based authentication. if DATAOPS_SNOWFLAKE_AUTH set to KEY_PAIR
, autoingestion orchestrator job would fail with the error: Key Pair authentication is not supported in autoingestion orchestrator, update credential accordingly.