Skip to main content

Stage Ingestion Orchestrator

Professional
Enterprise

Image$DATAOPS_STAGEINGESTION_RUNNER_IMAGE
Feature Status
Feature release status badge: PubPrev
PubPrev

The Stage Ingestion orchestrator ingests data from Snowflake stages into tables defined in the Snowflake configuration.

Multiple tables may be ingested, and these will run in parallel.

Usage

The Stage Ingestion orchestrator must always be used together with SOLE or SOLE for Data Products. SOLE will create the tables, file formats, and stages that the Stage Ingestion orchestrator uses.

The preview release currently supports ingestion from AWS S3 stages created by SOLE.

Supported parameters

ParameterRequired/DefaultDescription
DATAOPS_DIRECT_INGESTION_ACTIONREQUIREDMust be START
CONFIGURATION_DIRREQUIREDSet to $CI_PROJECT_DIR/dataops/snowflake to read the default Snowflake configuration files
DATAOPS_SOLE_ACCOUNTREQUIREDThe Snowflake account name to use
DATAOPS_SOLE_USERNAMEREQUIREDThe Snowflake username to use
DATAOPS_SOLE_PASSWORDREQUIREDThe Snowflake password to use
DATAOPS_SOLE_ROLEREQUIREDThe Snowflake role to use
DATAOPS_SOLE_WAREHOUSEREQUIREDThe Snowflake warehouse to use

Below is a typical job definition:

pipelines/includes/local_includes/ingestion_jobs/my_stage_ingest_job.yml
Stage Ingestion:
extends:
- .agent_tag
- .should_run_ingestion
image: $DATAOPS_STAGEINGESTION_RUNNER_IMAGE
stage: "Batch Ingestion"
variables:
DATAOPS_STAGE_INGESTION_ACTION: START
CONFIGURATION_DIR: $CI_PROJECT_DIR/dataops/snowflake
# Adjust the values below if required, to ingest as the appropriate user/role
DATAOPS_SOLE_ACCOUNT: DATAOPS_VAULT(SNOWFLAKE.ACCOUNT)
DATAOPS_SOLE_USERNAME: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.USERNAME)
DATAOPS_SOLE_PASSWORD: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.PASSWORD)
DATAOPS_SOLE_ROLE: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.ROLE)
DATAOPS_SOLE_WAREHOUSE: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.WAREHOUSE)
# prevent parallel ingestion for the same tables on the same branch
resource_group: $CI_JOB_NAME
script:
- /dataops
icon: ${SNOWFLAKEOBJECTLIFECYCLE_ICON}

Configuration tables for ingestion

You can configure the tables to be ingested in the Snowflake configuration files, which SOLE also reads. For each table, you can define an orchestrators: section. This section contains a stage_ingestion: section defining the ingestion parameters. Supported parameters are:

ParameterRequired/DefaultDescription
stageREQUIREDThe stage name containing the data. Currently, this must be in the same schema as the destination table.
file_formatREQUIREDThe file format name to be used to process the data. Currently, this must be in the same schema as the destination table.
pathREQUIREDThe path to the data file(s) inside the stage. This is appended to the stage name in the COPY INTO command.
methodOptional. Defaults to appendappend or full_load. full_load will truncate the table before ingestion. Not yet implemented.
truncateOptional. Defaults to falsetrue or false. Whether to truncate before ingestion. To be removed in a future release.
copyOptionalA key-value pair dictionary of options to be passed to the Snowflake COPY INTO command. See copy parameters for a definition of copy
patternOptionalA regular expression pattern string, enclosed in single quotes, specifying the file names and/or paths to match.

COPY INTO <table>

copy parameters

The copy parameter supports the following parameters:

Configuration KeyRequired/OptionalData Types and Values
size_limitOptionalInteger
forceOptionalBoolean
purgeOptionalBoolean
return_failed_lengthOptionalBoolean
enforce_lengthOptionalBoolean
truncatecolumnsOptionalBoolean
load_uncertain_filesOptionalBoolean
match_by_column_nameOptionalString: CASE_SENSITIVE, CASE_INSENSITIVE, NONE

See the Snowflake doc section for more information.

An example is shown below:

dataops/snowflake/databases.template.yml
databases:
"{{ env.DATAOPS_DATABASE }}":
schemas:
SCHEMA1:
tables:
NASA_PATENTS:
columns:
CENTER:
type: TEXT
STATUS:
type: TEXT
CASE_NUMBER:
type: TEXT
PATENT_NUMBER:
type: VARCHAR(255)
APPLICATION_SN:
type: TEXT
TITLE:
type: TEXT
PATENT_EXPIRATION_DATE:
type: DATE
_METADATA_FILENAME:
type: VARCHAR(255)
_METADATA_ROWNUM:
type: NUMBER(38,0)
_METADATA_INGEST_DATE:
type: TIMESTAMP_NTZ(9)
orchestrators:
stage_ingestion:
stage: DOL_SOURCE_BUCKET
file_format: CSV_FORMAT
path: csv/NASA_Patents.csv
truncate: true
copy:
force: true
DISCHARGE_REPORT_BY_CODE_MS_2:
columns:
GEOGRAPHIC_UNIT:
type: TEXT
STUDENT_CATEGORY:
type: TEXT
CODE:
type: TEXT
CODE_TYPE:
type: TEXT
DESCRIPTION_OF_CODE:
type: TEXT
COUNT_OF_STUDENTS:
type: VARCHAR(255)
TOTAL_ENROLLED_STUDENTS:
type: NUMBER
orchestrators:
stage_ingestion:
stage: SOURCE_STAGE
file_format: CSV_FORMAT
PATH: csv
PATTERN: ".*/.*/.*[.]csv"
truncate: true
copy:
force: true
size_limit: 6000
purge: true
return_failed_length: true
enforce_length: true
truncatecolumns: false
load_uncertain_files: true
BEHAVIORAL_RISK_FACTOR_SURVEILLANCE_SYSTEM:
columns:
DATA:
type: VARIANT
orchestrators:
stage_ingestion:
stage: SOURCE_STAGE
file_format: PARQUET_FORMAT
PATH: parquet/Behavioral_Risk_Factor_Surveillance_System__BRFSS__Prevalence_Data__2011_to_present_.parquet
truncate: true
copy:
force: true
ASSET_OUTAGE:
comment: "assetOutage from eNAMS"
columns:
JSON_DATA:
type: VARIANT
orchestrators:
stage_ingestion:
stage: external_data_stage
file_format: JSON_FORMAT
file_type: json
path: json/history_asset_outage.json
truncate: true
copy:
force: true

Adding metadata columns

You can add several special columns to a table definition. If a column is present in the table definition, the ingestion process will populate the column as part of the ingestion. The current set of columns is:

Column nameContents
_METADATA_FILENAMEThe file that the data came from.
_METADATA_ROWNUMThe row in the file that the data came from.
_METADATA_INGEST_DATEThe database date and time that the data was ingested.
_METADATA_JOB_IDThe DataOps.live job ID that ran the ingestion.
_METADATA_PIPELINE_IDThe DataOps.live pipeline ID that ran the ingestion.
_METADATA_BRANCH_NAMEThe DataOps.live branch that ran the ingestion.

Loading files using column matching

Load files from an external stage into the table with the match_by_column_name copy option. This involves performing a case-insensitive match between the column names in the files and those defined in the table. This option allows flexibility, as the column ordering in the files does not have to align with the column ordering in the table.

dataops/snowflake/databases.template.yml
databases:
"{{ env.DATAOPS_DATABASE }}":
schemas:
SCHEMA1:
tables:
BEHAVIORAL_RISK_FACTOR_SURVEILLANCE_SYSTEM:
columns:
BREAKOUTCATEGORYID:
type: VARCHAR
BREAK_OUT:
type: VARCHAR
BREAK_OUT_CATEGORY:
type: VARCHAR
BREAKOUTID:
type: VARCHAR
CLASSID:
type: VARCHAR
CONFIDENCE_LIMIT_HIGH:
type: VARCHAR
DATASOURCE:
type: VARCHAR
DATA_VALUE_FOOTNOTE_SYMBOL:
type: VARCHAR
DATA_VALUE:
type: VARCHAR
DISPLAY_ORDER:
type: VARCHAR
GEOLOCATION:
type: VARCHAR
LOCATIONID:
type: VARCHAR
QUESTIONID:
type: VARCHAR
RESPONSEID:
type: VARCHAR
TOPICID:
type: VARCHAR
YEAR:
type: VARCHAR
orchestrators:
stage_ingestion:
stage: SOURCE_STAGE
file_format: PARQUET_FORMAT
PATH: parquet/Behavioral_Risk_Factor_Surveillance_System__BRFSS__Prevalence_Data__2011_to_present_.parquet
include_metadata: true | false
method: append | full_load
post_hook: sql|shell
truncate: true
copy:
force: true
match_by_column_name: CASE_INSENSITIVE
ASSET_OUTAGE:
comment: "assetOutage from eNAMS"
columns:
EN_NASAP_Circuit_Code__c:
type: VARCHAR(50)
EN_Record_Type_Name__c:
type: VARCHAR(50)
Name:
type: VARCHAR(50)
EN_Outage__c:
type: VARCHAR(50)
LastModifiedDate:
type: VARCHAR(50)
IsDeleted:
type: VARCHAR(50)
Id:
type: VARCHAR(50)
orchestrators:
stage_ingestion:
stage: SOURCE_STAGE
file_format: JSON_FORMAT
file_type: json
path: json/history_asset_outage.json
truncate: true
copy:
force: true
match_by_column_name: CASE_INSENSITIVE

Loading using pattern matching

Load data from compressed CSV files located in any path within a table's stage using pattern matching, and then populate the table with the loaded data.

dataops/snowflake/databases.template.yml
databases:
"{{ env.DATAOPS_DATABASE }}":
schemas:
SCHEMA1:
tables:
DISCHARGE_REPORT_BY_CODE_MS_2:
columns:
GEOGRAPHIC_UNIT:
type: TEXT
STUDENT_CATEGORY:
type: TEXT
CODE:
type: TEXT
CODE_TYPE:
type: TEXT
DESCRIPTION_OF_CODE:
type: TEXT
COUNT_OF_STUDENTS:
type: VARCHAR(255)
TOTAL_ENROLLED_STUDENTS:
type: NUMBER
orchestrators:
stage_ingestion:
stage: SOURCE_STAGE
file_format: CSV_FORMAT
PATH: csv
PATTERN: ".*/.*/.*[.]csv"
truncate: true
copy:
force: true
size_limit: 6000
purge: true
return_failed_length: true
enforce_length: true
truncatecolumns: false
load_uncertain_files: true

Where .\* is interpreted as “zero or more occurrences of any character.” The square brackets escape the period character (.) that precedes a file extension.

Loading JSON data into a VARIANT column

The following example loads JSON data into a table with a single column of type VARIANT.

{
"EN_NASAP_Circuit_Code__c": "Circuit123",
"EN_Outage__c": true,
"EN_RECORD_Type_Name__c": "TypeA",
"Id": "12345",
"IsDeleted": false,
"LastModifiedDate": "2023-10-24T14:30:00",
"Name": "AssetA"
}
dataops/snowflake/databases.template.yml
databases:
"{{ env.DATAOPS_DATABASE }}":
schemas:
SCHEMA1:
tables:
ASSET_OUTAGE:
comment: "assetOutage from eNAMS"
columns:
JSON_DATA:
type: VARIANT
orchestrators:
stage_ingestion:
stage: external_data_stage
file_format: JSON_FORMAT
file_type: json
path: json/history_asset_outage.json
truncate: true
copy:
force: true

Bulk loading from Amazon S3

Use the COPY INTO <table> command to transfer the data from the staged files into a table within a Snowflake database. While it's possible to load directly from the bucket, Snowflake advises creating an external stage that points to the bucket and utilizing the external stage instead.

Irrespective of the chosen method, this process mandates a currently active virtual warehouse for the session. Whether executing the command manually or within a script, the warehouse furnishes the computational resources necessary for the insertion of rows into the table.

Data load bulk S3 !!shadow!!

See the Snowflake documentation for more information.

Auto-Generating sources from SOLE ingestion tables

Eliminate the need for manual addition of dbt sources by sharing table definitions between Snowflake Object Lifecycle Engine (SOLE) and Modelling and Transformation Engine (MATE). This streamlines onboarding for new data sources, reduces time to launch projects, and prevents duplicated efforts.

Sharing SOLE configuration files enables automatic metadata generation and transformation into ready-to-use MATE sources, providing a significant productivity advantage by eliminating the need for duplicate table definitions.

See the Auto-Generating Sources from SOLE Ingestion Tables documentation for more information.

For more information about generating sources for tables not managed by SOLE, see Generating Sources for Tables Managed Outside SOLE.