Skip to main content

Stage Ingestion Orchestrator

TypePre-Set
Image$DATAOPS_STAGEINGESTION_RUNNER_IMAGE
Feature Status
Feature release status badge: PubPrev
PubPrev

The Stage Ingestion orchestrator is a pre-set orchestrator responsible for ingesting data from Snowflake stages into tables defined in the Snowflake configuration.

Multiple tables may be ingested, and these will run in parallel.

Usage

The Stage Ingestion orchestrator must always be used together with SOLE. SOLE will create the tables, file formats, and stages that the Stage Ingestion orchestrator uses.

The preview release currently supports ingestion from AWS S3 stages created by SOLE.

Supported parameters

ParameterRequired/DefaultDescription
DATAOPS_DIRECT_INGESTION_ACTIONREQUIREDMust be START
CONFIGURATION_DIRREQUIREDSet to $CI_PROJECT_DIR/dataops/snowflake to read the default Snowflake configuration files
DATAOPS_SOLE_ACCOUNTREQUIREDThe Snowflake account name to use
DATAOPS_SOLE_USERNAMEREQUIREDThe Snowflake username to use
DATAOPS_SOLE_PASSWORDREQUIREDThe Snowflake password to use
DATAOPS_SOLE_ROLEREQUIREDThe Snowflake role to use
DATAOPS_SOLE_WAREHOUSEREQUIREDThe Snowflake warehouse to use

Below is a typical job definition:

pipelines/includes/local_includes/ingestion_jobs/my_stage_ingest_job.yml
Stage Ingestion:
extends:
- .agent_tag
- .should_run_ingestion
image: dataopslive/dataops-stageingestion-orchestrator:5-latest
stage: "Batch Ingestion"
variables:
DATAOPS_STAGE_INGESTION_ACTION: START
CONFIGURATION_DIR: $CI_PROJECT_DIR/dataops/snowflake
# Adjust the values below if required, to ingest as the appropriate user/role
DATAOPS_SOLE_ACCOUNT: DATAOPS_VAULT(SNOWFLAKE.ACCOUNT)
DATAOPS_SOLE_USERNAME: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.USERNAME)
DATAOPS_SOLE_PASSWORD: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.PASSWORD)
DATAOPS_SOLE_ROLE: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.ROLE)
DATAOPS_SOLE_WAREHOUSE: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.WAREHOUSE)
# prevent parallel ingestion for the same tables on the same branch
resource_group: $CI_JOB_NAME
script:
- /dataops
icon: ${SNOWFLAKEOBJECTLIFECYCLE_ICON}

Configuration tables for ingestion

The tables to be ingested are configured in the Snowflake configuration files, which are also read by SOLE. For each table, an orchestrators: section can be defined. This section contains a stage_ingestion: section, which defines the ingestion parameters. Supported parameters are

ParameterRequired/DefaultDescription
stageREQUIREDThe name of the stage that contains the data. Currently, this must be in the same schema as the destination table.
file_formatREQUIREDThe name of the file format to be used to process the data. Currently, this must be in the same schema as the destination table.
pathREQUIREDThe path to the data file(s) inside the stage. This is appended to the stage name in the COPY INTO command.
methodOptional. Defaults to appendappend or full_load. full_load will truncate the table before ingestion. Not yet implemented.
truncateOptional. Defaults to falsetrue or false. Whether to truncate before ingestion. To be removed in a future release.
copyOptionalA key-value pair dictionary of options to be passed to the Snowflake COPY INTO command. A typical use for this is to set force: true to cause all files in the stage to be reread. Only supported parameters for copy are force, match_by_column_name, on_error.

An example is shown below:

dataops/snowflake/databases.template.yml
databases:
"{{ env.DATAOPS_DATABASE }}":
schemas:
SCHEMA1:
tables:
NASA_PATENTS:
columns:
CENTER:
type: TEXT
STATUS:
type: TEXT
CASE_NUMBER:
type: TEXT
PATENT_NUMBER:
type: VARCHAR(255)
APPLICATION_SN:
type: TEXT
TITLE:
type: TEXT
PATENT_EXPIRATION_DATE:
type: DATE
_METADATA_FILENAME:
type: VARCHAR(255)
_METADATA_ROWNUM:
type: NUMBER(38,0)
_METADATA_INGEST_DATE:
type: TIMESTAMP_NTZ(9)
orchestrators:
stage_ingestion:
stage: DOL_SOURCE_BUCKET
file_format: CSV_FORMAT
path: csv/NASA_Patents.csv
truncate: true
copy:
force: true

Adding metadata columns

Several special columns can be added to a table definition. If a column is present in the table definition, the ingestion process will populate the column as part of the ingestion. The current set of columns is:

Column nameContents
_METADATA_FILENAMEThe file that the data came from.
_METADATA_ROWNUMThe row in the file that the data came from.
_METADATA_INGEST_DATEThe database date and time that the data was ingested.
_METADATA_JOB_IDThe DataOps.live job ID that ran the ingestion.
_METADATA_PIPELINE_IDThe DataOps.live pipeline ID that ran the ingestion.
_METADATA_BRANCH_NAMEThe DataOps.live branch that ran the ingestion.

Project resources

None

Host dependencies (and Resources)

None