Stage Ingestion Orchestrator
Type | Pre-Set |
---|---|
Image | $DATAOPS_STAGEINGESTION_RUNNER_IMAGE |
Feature Status | PubPrev |
The Stage Ingestion orchestrator is a pre-set orchestrator responsible for ingesting data from Snowflake stages into tables defined in the Snowflake configuration.
Multiple tables may be ingested, and these will run in parallel.
Usage
The Stage Ingestion orchestrator must always be used together with SOLE. SOLE will create the tables, file formats, and stages that the Stage Ingestion orchestrator uses.
The preview release currently supports ingestion from AWS S3 stages created by SOLE.
Supported parameters
Parameter | Required/Default | Description |
---|---|---|
DATAOPS_DIRECT_INGESTION_ACTION | REQUIRED | Must be START |
CONFIGURATION_DIR | REQUIRED | Set to $CI_PROJECT_DIR/dataops/snowflake to read the default Snowflake configuration files |
DATAOPS_SOLE_ACCOUNT | REQUIRED | The Snowflake account name to use |
DATAOPS_SOLE_USERNAME | REQUIRED | The Snowflake username to use |
DATAOPS_SOLE_PASSWORD | REQUIRED | The Snowflake password to use |
DATAOPS_SOLE_ROLE | REQUIRED | The Snowflake role to use |
DATAOPS_SOLE_WAREHOUSE | REQUIRED | The Snowflake warehouse to use |
Below is a typical job definition:
Stage Ingestion:
extends:
- .agent_tag
- .should_run_ingestion
image: $DATAOPS_STAGEINGESTION_RUNNER_IMAGE
stage: "Batch Ingestion"
variables:
DATAOPS_STAGE_INGESTION_ACTION: START
CONFIGURATION_DIR: $CI_PROJECT_DIR/dataops/snowflake
# Adjust the values below if required, to ingest as the appropriate user/role
DATAOPS_SOLE_ACCOUNT: DATAOPS_VAULT(SNOWFLAKE.ACCOUNT)
DATAOPS_SOLE_USERNAME: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.USERNAME)
DATAOPS_SOLE_PASSWORD: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.PASSWORD)
DATAOPS_SOLE_ROLE: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.ROLE)
DATAOPS_SOLE_WAREHOUSE: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.WAREHOUSE)
# prevent parallel ingestion for the same tables on the same branch
resource_group: $CI_JOB_NAME
script:
- /dataops
icon: ${SNOWFLAKEOBJECTLIFECYCLE_ICON}
Configuration tables for ingestion
You can configure the tables to be ingested in the Snowflake configuration files, which SOLE also reads. For each table, you can define an orchestrators:
section.
This section contains a stage_ingestion:
section defining the ingestion parameters. Supported parameters are:
Parameter | Required/Default | Description |
---|---|---|
stage | REQUIRED | The stage name containing the data. Currently, this must be in the same schema as the destination table. |
file_format | REQUIRED | The file format name to be used to process the data. Currently, this must be in the same schema as the destination table. |
path | REQUIRED | The path to the data file(s) inside the stage. This is appended to the stage name in the COPY INTO command. |
method | Optional. Defaults to append | append or full_load . full_load will truncate the table before ingestion. Not yet implemented. |
truncate | Optional. Defaults to false | true or false . Whether to truncate before ingestion. To be removed in a future release. |
copy | Optional | A key-value pair dictionary of options to be passed to the Snowflake COPY INTO command. See copy parameters for a definition of copy |
pattern | Optional | A regular expression pattern string, enclosed in single quotes, specifying the file names and/or paths to match. |
copy
parameters
The copy parameter supports the following parameters:
Configuration Key | Required/Optional | Data Types and Values |
---|---|---|
size_limit | Optional | Integer |
force | Optional | Boolean |
purge | Optional | Boolean |
return_failed_length | Optional | Boolean |
enforce_length | Optional | Boolean |
truncatecolumns | Optional | Boolean |
load_uncertain_files | Optional | Boolean |
See the Snowflake doc section for more info.
An example is shown below:
databases:
"{{ env.DATAOPS_DATABASE }}":
schemas:
SCHEMA1:
tables:
NASA_PATENTS:
columns:
CENTER:
type: TEXT
STATUS:
type: TEXT
CASE_NUMBER:
type: TEXT
PATENT_NUMBER:
type: VARCHAR(255)
APPLICATION_SN:
type: TEXT
TITLE:
type: TEXT
PATENT_EXPIRATION_DATE:
type: DATE
_METADATA_FILENAME:
type: VARCHAR(255)
_METADATA_ROWNUM:
type: NUMBER(38,0)
_METADATA_INGEST_DATE:
type: TIMESTAMP_NTZ(9)
orchestrators:
stage_ingestion:
stage: DOL_SOURCE_BUCKET
file_format: CSV_FORMAT
path: csv/NASA_Patents.csv
truncate: true
copy:
force: true
DISCHARGE_REPORT_BY_CODE_MS_2:
columns:
GEOGRAPHIC_UNIT:
type: TEXT
STUDENT_CATEGORY:
type: TEXT
CODE:
type: TEXT
CODE_TYPE:
type: TEXT
DESCRIPTION_OF_CODE:
type: TEXT
COUNT_OF_STUDENTS:
type: VARCHAR(255)
TOTAL_ENROLLED_STUDENTS:
type: NUMBER
orchestrators:
stage_ingestion:
stage: SOURCE_STAGE
file_format: CSV_FORMAT
PATH: csv
PATTERN: ".*/.*/.*[.]csv"
truncate: true
copy:
force: true
size_limit: 6000
purge: true
return_failed_length: true
enforce_length: true
truncatecolumns: false
load_uncertain_files: true
Adding metadata columns
Several special columns can be added to a table definition. If a column is present in the table definition, the ingestion process will populate the column as part of the ingestion. The current set of columns is:
Column name | Contents |
---|---|
_METADATA_FILENAME | The file that the data came from. |
_METADATA_ROWNUM | The row in the file that the data came from. |
_METADATA_INGEST_DATE | The database date and time that the data was ingested. |
_METADATA_JOB_ID | The DataOps.live job ID that ran the ingestion. |
_METADATA_PIPELINE_ID | The DataOps.live pipeline ID that ran the ingestion. |
_METADATA_BRANCH_NAME | The DataOps.live branch that ran the ingestion. |