Auto-Generating Sources from SOLE Ingestion Tables
You can use the ingestion tables you manage in Snowflake Object Lifecycle Engine (SOLE) configuration files as dbt sources for MATE without manually adding new dbt sources whenever you create a table with the same schema, columns, etc. The benefits of this approach are:
- Faster onboarding of new data sources
- Shorter time to go live for new projects
- No duplication of effort
Sharing ingestion table definitions allows:
- The automatic generation of the metadata of any table you manage in the SOLE configuration files
- The automatic transformation of table metadata into ready-to-use MATE sources
This mechanism is a huge productivity win as you don't have to duplicate your effort and create the table definition twice.
For more information about using a script to generate sources for tables not managed by SOLE, see Generating Sources for Tables Managed Outside SOLE.
Prerequisites
Before sharing ingestion table definitions between engines, ensure you enable the autogeneration of MATE dbt sources.
Ingesting data
Declaring a table in SOLE and using it in MATE
To start sharing sources between SOLE for Data Products and MATE, follow the below steps:
-
Configure a table in the SOLE configuration file
dataops/snowflake/databases.template.yml
. -
Run a pipeline so that the SOLE job prepares the data needed by MATE to generate a new
sources.yml
file. -
Enable at least one MATE job to generate the source file.
- The name of the generated file is suffixed by
CI_PIPELINE_ID
.
- The name of the generated file is suffixed by
-
To use the newly generated file in MATE and add the directory
autogenerated_sources
in thedbt_project.yml
file:dataops/modelling/dbt_project.yml## Project
name: MyProject
version: 0.1
config-version: 2
## Sources
model-paths: [models, sources, autogenerated_sources]
By adding autogenerated_sources
as an additional sources directory, MATE finds and uses the new source file during the pipeline run.
The new sources.yml
file is available only during a pipeline run. It does not get hardcoded in your repository. See Capturing the generated source for more information.
The below examples show in detail how to ingest data from a single source system , ingest data from two source systems, declare MATE tests in SOLE, use tags to build specific tables in MATE, and generate a joined table of manual and autogenerated sources.
Ingesting data from a single source system
In this example, you want to ingest customer data from Salesforce:
- Default Configuration
- Data Products Configuration
databases:
CUSTOMER_DATA:
namespacing: none
orchestrators:
mate:
source_system: SALESFORCE
schemas:
STAGING:
tables:
CUSTOMER:
columns:
CUSTOMER_NAME:
type: INT
CUSTOMER_ADDRESS:
type: VARCHAR
PURCHASES:
columns:
PRODUCT_NAME:
type: VARCHAR
QUANTITY:
type: VARCHAR
PURCHASE_DATE:
type: DATE
PURCHASE_COUNT:
type: INT
- database:
name: CUSTOMER_DATA
namespacing: none
orchestrators:
mate:
source_system: SALESFORCE
- schema:
name: STAGING
database: rel(database.CUSTOMER_DATA)
- table:
name: CUSTOMER
database: rel(database.CUSTOMER_DATA)
schema: rel(schema.STAGING)
columns:
CUSTOMER_NAME:
type: INT
CUSTOMER_ADDRESS:
type: VARCHAR
- table:
name: PURCHASES
database: rel(database.CUSTOMER_DATA)
schema: rel(schema.STAGING)
columns:
PRODUCT_NAME:
type: VARCHAR
QUANTITY:
type: VARCHAR
PURCHASE_DATE:
type: DATE
PURCHASE_COUNT:
type: INT
Without further setup, you can immediately use the ingestion tables as MATE source tables.
SELECT
CUSTOMER_NAME,
CUSTOMER_ADDRESS
FROM {{ source('DATAOPS_STAGING_SALESFORCE', 'CUSTOMER') }}
The source's name always starts with the DATAOPS_
prefix, followed by the schema name. If source_system
is provided, the value thereof is used as a postfix. In this example, STAGING
is the schema name, SALESFORCE
is the name of the source system, and CUSTOMER
is the table name.
Ingesting data from two source systems
In this second example, you want to ingest customer data from Salesforce and SAP:
- Default Configuration
- Data Products Configuration
databases:
CUSTOMER_DATA:
namespacing: none
comment: This is the main DataOps database for environment {{ env.DATAOPS_ENV_NAME }}
orchestrators:
mate:
source_system: SALESFORCE
schemas:
STAGING:
tables:
CUSTOMER:
columns:
ID:
type: INT
CUSTOMER_NAME:
type: INT
CUSTOMER_ADDRESS:
type: VARCHAR
PURCHASES:
columns:
CUSTOMER_ID:
type: INT
PRODUCT_NAME:
type: VARCHAR
QUANTITY:
type: VARCHAR
PURCHASE_DATE:
type: DATE
PURCHASE_COUNT:
type: INT
DB:
namespacing: none
comment: this DB has the same schemas and tables as above, but the source_system key will ensure there is no name conflict
orchestrators:
mate:
source_system: SAP
schemas:
STAGING:
tables:
CUSTOMER:
columns:
ID:
type: INT
CUSTOMER_NAME:
type: INT
CUSTOMER_ADDRESS:
type: VARCHAR
PURCHASES:
columns:
CUSTOMER_ID:
type: INT
PRODUCT_NAME:
type: VARCHAR
QUANTITY:
type: VARCHAR
PURCHASE_DATE:
type: DATE
PURCHASE_COUNT:
type: INT
external_tables:
DEALER:
file_format:
format_type: "CSV"
compression: "AUTO"
field_delimiter: ","
skip_header: 1
skip_blank_lines: false
location:
database: "EXT_DATABASE"
schema: "PUBLIC"
stage: "EXT_STAGE"
columns:
Name:
type: "text"
as: "(value:c1::text)"
Location:
type: "text"
as: "(value:c2::text)"
comment: "Test external table 1"
auto_refresh: true
copy_grants: false
refresh_on_create: true
- database:
name: CUSTOMER_DATA
namespacing: none
comment: This is the main DataOps database for environment {{ env.DATAOPS_ENV_NAME }}
orchestrators:
mate:
source_system: SALESFORCE
- schema:
name: STAGING
database: rel(database.CUSTOMER_DATA)
- table:
name: CUSTOMER
database: rel(database.CUSTOMER_DATA)
schema: rel(schema.STAGING)
comment: "assetOutage from eNAMS"
columns:
ID:
type: INT
CUSTOMER_NAME:
type: INT
CUSTOMER_ADDRESS:
type: VARCHAR
- table:
name: PURCHASES
database: rel(database.CUSTOMER_DATA)
schema: rel(schema.STAGING)
columns:
CUSTOMER_ID:
type: INT
PRODUCT_NAME:
type: VARCHAR
QUANTITY:
type: VARCHAR
PURCHASE_DATE:
type: DATE
PURCHASE_COUNT:
type: INT
- database:
name: DB
namespacing: none
comment: this DB has the same schemas and tables as above, but the source_system key will ensure there is no name conflict
orchestrators:
mate:
source_system: SAP
- schema:
name: STAGING
database: rel(database.DB)
- table:
name: CUSTOMER
database: rel(database.DB)
schema: rel(schema.STAGING)
comment: "assetOutage from eNAMS"
columns:
ID:
type: INT
CUSTOMER_NAME:
type: INT
CUSTOMER_ADDRESS:
type: VARCHAR
- table:
name: PURCHASES
database: rel(database.DB)
schema: rel(schema.STAGING)
columns:
CUSTOMER_ID:
type: INT
PRODUCT_NAME:
type: VARCHAR
QUANTITY:
type: VARCHAR
PURCHASE_DATE:
type: DATE
PURCHASE_COUNT:
type: INT
- external_table:
name: DEALER
database: rel(database.DB)
schema: rel(schema.STAGING)
file_format:
format_type: "CSV"
compression: "AUTO"
field_delimiter: ","
skip_header: 1
skip_blank_lines: false
location:
database: rel(database.EXT_DATABASE)
schema: rel(schema.PUBLIC)
stage: rel(stage.EXT_STAGE)
columns:
Name:
type: "text"
as: "(value:c1::text)"
Location:
type: "text"
as: "(value:c2::text)"
comment: "Test external table 1"
auto_refresh: true
copy_grants: false
refresh_on_create: true
When ingesting from two different sources, a sample of the generated sources.yml
file looks like this. Note the file name suffix CI_PIPELINE_ID
:
version: 2
sources:
- name: DATAOPS_STAGING_SALESFORCE
description: ""
database: CUSTOMER_DATA
schema: STAGING
tables:
- name: CUSTOMER
description: ""
columns:
- name: ID
description: ""
- name: CUSTOMER_NAME
description: ""
- name: CUSTOMER_ADDRESS
description: ""
- name: PURCHASES
description: ""
columns:
- name: CUSTOMER_ID
description: ""
- name: PRODUCT_NAME
description: ""
- name: QUANTITY
description: ""
- name: PURCHASE_DATE
description: ""
- name: PURCHASE_COUNT
description: ""
- name: DATAOPS_STAGING_SAP
description: ""
database: DB
schema: STAGING
tables:
- name: CUSTOMER
description: ""
columns:
- name: ID
description: ""
- name: CUSTOMER_NAME
description: ""
- name: CUSTOMER_ADDRESS
description: ""
- name: PURCHASES
description: ""
columns:
- name: CUSTOMER_ID
description: ""
- name: PRODUCT_NAME
description: ""
- name: QUANTITY
description: ""
- name: PURCHASE_DATE
description: ""
- name: PURCHASE_COUNT
description: ""
- name: DEALER
description: ""
columns:
- name: Name
description: ""
- name: Year
description: ""
Without further setup, you can immediately use the ingestion tables as MATE source tables.
SELECT
CUSTOMER_NAME,
(SELECT COUNT(*) FROM {{ source('DATAOPS_STAGING_SAP', 'PURCHASES') }}
WHERE ID = CUSTOMER_ID)
AS "Number Purchases"
FROM {{ source('DATAOPS_STAGING_SALESFORCE', 'CUSTOMER') }}
The source's name always starts with the DATAOPS_
prefix, followed by the schema name. If source_system
is provided, the value thereof is used as a postfix. In this example, STAGING
is the schema name for the two sources, SALESFORCE
and SAP
are the names of the source systems, and PURCHASE
and CUSTOMER
are the table names.
Declaring MATE tests in SOLE
In this third example, you want to use the tags you define on columns in SOLE to run a MATE source testing stage independently for Salesforce and SAP by using a job that uses model selections via tags.
- Default Configuration
- Data Products Configuration
databases:
CUSTOMER_DATA:
namespacing: none
comment: This is the main DataOps database for environment {{ env.DATAOPS_ENV_NAME }}
orchestrators:
mate:
source_system: SALESFORCE
schemas:
STAGING:
tables:
CUSTOMER:
columns:
ID:
type: INT
tests:
- unique
orchestrators:
mate:
tags:
- src.salesforce
CUSTOMER_NAME:
type: INT
CUSTOMER_ADDRESS:
type: VARCHAR
PURCHASES:
columns:
CUSTOMER_ID:
type: INT
PRODUCT_NAME:
type: VARCHAR
QUANTITY:
type: VARCHAR
PURCHASE_DATE:
type: DATE
PURCHASE_COUNT:
type: INT
tests:
- unique
orchestrators:
mate:
tags:
- src.salesforce
DB:
namespacing: none
comment: this DB has the same schemas and tables as above, but the source_system key will ensure there is no name conflict
orchestrators:
mate:
source_system: SAP
schemas:
STAGING:
tables:
CUSTOMER:
columns:
ID:
type: INT
CUSTOMER_NAME:
type: INT
CUSTOMER_ADDRESS:
type: VARCHAR
PURCHASES:
columns:
CUSTOMER_ID:
type: INT
PRODUCT_NAME:
type: VARCHAR
QUANTITY:
type: VARCHAR
PURCHASE_DATE:
type: DATE
PURCHASE_COUNT:
type: INT
- database:
name: CUSTOMER_DATA
namespacing: none
comment: "This is the main DataOps database for environment {{ env.DATAOPS_ENV_NAME }}"
orchestrators:
mate:
source_system: SALESFORCE
- schema:
name: STAGING
database: rel(database.CUSTOMER_DATA)
- table:
name: CUSTOMER
database: rel(database.CUSTOMER_DATA)
schema: rel(schema.STAGING)
comment: "assetOutage from eNAMS"
columns:
ID:
type: INT
tests:
- unique
orchestrators:
mate:
tags:
- src.salesforce
CUSTOMER_NAME:
type: INT
CUSTOMER_ADDRESS:
type: VARCHAR
- table:
name: PURCHASES
database: rel(database.CUSTOMER_DATA)
schema: rel(schema.STAGING)
columns:
CUSTOMER_ID:
type: INT
PRODUCT_NAME:
type: VARCHAR
QUANTITY:
type: VARCHAR
PURCHASE_DATE:
type: DATE
PURCHASE_COUNT:
type: INT
tests:
- unique
orchestrators:
mate:
tags:
- src.salesforce
- database:
name: DB
namespacing: none
comment: this DB has the same schemas and tables as above, but the source_system key will ensure there is no name conflict
orchestrators:
mate:
source_system: SAP
- schema:
name: STAGING
database: rel(database.DB)
- table:
name: CUSTOMER
database: rel(database.DB)
schema: rel(schema.STAGING)
comment: "assetOutage from eNAMS"
columns:
ID:
type: INT
CUSTOMER_NAME:
type: INT
CUSTOMER_ADDRESS:
type: VARCHAR
- table:
name: PURCHASES
database: rel(database.DB)
schema: rel(schema.STAGING)
columns:
CUSTOMER_ID:
type: INT
PRODUCT_NAME:
type: VARCHAR
QUANTITY:
type: VARCHAR
PURCHASE_DATE:
type: DATE
PURCHASE_COUNT:
type: INT
Without further setup, you can immediately use the tags you define on columns in SOLE to run a MATE source testing.
Test only Salesforce with tag:
extends:
- .modelling_and_transformation_base
- .agent_tag
variables:
TRANSFORM_ACTION: TEST
TRANSFORM_MODEL_SELECTOR: tag:src.salesforce
stage: Source Testing
script:
- /dataops
icon: ${TESTING_ICON}
artifacts:
when: always
reports:
junit: $CI_PROJECT_DIR/report.xml
The two tags src.salesforce
applied to the columns ID
and CUSTOMER_ID
in the SALESFORCE
source system are used to run the MATE source testing stage.
The results of running the test look like this:
Using tags to build specific tables in MATE
In this third example, you want to use the tags you define on tables in SOLE to run a MATE source to build a specific table by using a job that uses model selections via tags.
- Default Configuration
- Data Products Configuration
databases:
CUSTOMER_DATA:
namespacing: none
comment: This is the main DataOps database for environment {{ env.DATAOPS_ENV_NAME }}
orchestrators:
mate:
source_system: SALESFORCE
schemas:
STAGING:
tables:
CUSTOMER:
orchestrators:
mate:
tags:
- sf_table_tag
columns:
ID:
type: INT
CUSTOMER_NAME:
type: INT
CUSTOMER_ADDRESS:
type: VARCHAR
PURCHASES:
orchestrators:
mate:
tags:
- sf_table_tag
columns:
CUSTOMER_ID:
type: INT
PRODUCT_NAME:
type: VARCHAR
QUANTITY:
type: VARCHAR
PURCHASE_DATE:
type: DATE
PURCHASE_COUNT:
type: INT
DB:
namespacing: none
comment: this DB has the same schemas and tables as above, but the `source_system` tag will ensure there is no name conflict
orchestrators:
mate:
source_system: SAP
schemas:
STAGING:
tables:
CUSTOMER:
columns:
ID:
type: INT
CUSTOMER_NAME:
type: INT
CUSTOMER_ADDRESS:
type: VARCHAR
PURCHASES:
columns:
CUSTOMER_ID:
type: INT
PRODUCT_NAME:
type: VARCHAR
QUANTITY:
type: VARCHAR
PURCHASE_DATE:
type: DATE
PURCHASE_COUNT:
type: INT
- database:
name: CUSTOMER_DATA
namespacing: none
comment: This is the main DataOps database for environment {{ env.DATAOPS_ENV_NAME }}
orchestrators:
mate:
source_system: SALESFORCE
- schema:
name: STAGING
database: rel(database.CUSTOMER_DATA)
- table:
name: CUSTOMER
database: rel(database.CUSTOMER_DATA)
schema: rel(schema.STAGING)
comment: "assetOutage from eNAMS"
orchestrators:
mate:
tags:
- sf_table_tag
columns:
ID:
type: INT
CUSTOMER_NAME:
type: INT
CUSTOMER_ADDRESS:
type: VARCHAR
- table:
name: PURCHASES
database: rel(database.CUSTOMER_DATA)
schema: rel(schema.STAGING)
orchestrators:
mate:
tags:
- sf_table_tag
columns:
CUSTOMER_ID:
type: INT
PRODUCT_NAME:
type: VARCHAR
QUANTITY:
type: VARCHAR
PURCHASE_DATE:
type: DATE
PURCHASE_COUNT:
type: INT
- database:
name: DB
namespacing: none
comment: this DB has the same schemas and tables as above, but the `source_system` tag will ensure there is no name conflict
orchestrators:
mate:
source_system: SAP
- schema:
name: STAGING
database: rel(database.DB)
- table:
name: CUSTOMER
database: rel(database.DB)
schema: rel(schema.STAGING)
columns:
ID:
type: INT
CUSTOMER_NAME:
type: INT
CUSTOMER_ADDRESS:
type: VARCHAR
- table:
name: PURCHASES
database: rel(database.DB)
schema: rel(schema.STAGING)
columns:
CUSTOMER_ID:
type: INT
PRODUCT_NAME:
type: VARCHAR
QUANTITY:
type: VARCHAR
PURCHASE_DATE:
type: DATE
PURCHASE_COUNT:
type: INT
Without further setup, you can immediately use the tags you define on tables in SOLE to run a MATE source and build the table.
Build all Models:
extends:
- .modelling_and_transformation_base
- .agent_tag
variables:
TRANSFORM_ACTION: RUN
TRANSFORM_MODEL_SELECTOR: tag:sf_table_tag
stage: Data Transformation
script:
- /dataops
icon: ${TRANSFORM_ICON}
Capturing the generated source
Sometimes, you may need to check all the details of the generated file for troubleshooting purposes, for example. However, since the new sources.yml
file is available only during a pipeline run, here are a few simple steps to capture the file:
- Export the file as an artifact or from the log of the MATE job.
- Upload the file to the project for additional configuration.
- Disable the ingestion tables sharing to avoid conflict.
- Subsequently, consume the new tables.
The example below shows how to set an export as an artifact condition:
Test all Sources:
extends:
- .modelling_and_transformation_base
- .agent_tag
variables:
TRANSFORM_ACTION: TEST
stage: Source Testing
script:
- /dataops
icon: ${TESTING_ICON}
artifacts:
paths:
- $CI_PROJECT_DIR/dataops/modelling/autogenerated_sources/
User-defined and autogenerated sources
Below is an example that shows the differences between a user-defined source with a freshness clause and an autogenerated source based on the same ingestion table:
version: 2
sources:
- name: MANUALLY_DEFINED_EXAMPLE_SCHEMA_SALESFORCE
freshness:
warn_after:
count: 1
period: day
description: ''
database: CUSTOMER_DATA
schema: EXAMPLE_SCHEMA
tables:
- name: PURCHASES
description: ''
columns:
- name: CUSTOMER_ID
description: ''
- name: PRODUCT_NAME
description: ''
- name: QUANTITY
description: ''
- name: PURCHASE_DATE
description: ''
- name: PURCHASE_COUNT
description: ''
tags:
- src.salesforce
tests:
- unique
--- sources{ci_pipeline_id}.yml
version: 2
sources:
- name: DATAOPS_EXAMPLE_SCHEMA_SALESFORCE
description: ''
database: CUSTOMER_DATA
schema: EXAMPLE_SCHEMA
tables:
- name: CUSTOMER
description: ''
columns:
- name: ID
description: ''
tags:
- src.salesforce
tests:
- unique
- name: CUSTOMER_NAME
description: ''
- name: CUSTOMER_ADDRESS
description: ''
- name: PURCHASES
description: ''
columns:
- name: CUSTOMER_ID
description: ''
- name: PRODUCT_NAME
description: ''
- name: QUANTITY
description: ''
- name: PURCHASE_DATE
description: ''
- name: PURCHASE_COUNT
description: ''
tags:
- src.salesforce
tests:
- unique
- name: DATAOPS_EXAMPLE_SCHEMA_SAP
description: ''
database: DB
schema: EXAMPLE_SCHEMA
tables:
- name: CUSTOMER
description: ''
columns:
- name: ID
description: ''
- name: CUSTOMER_NAME
description: ''
- name: CUSTOMER_ADDRESS
description: ''
- name: PURCHASES
description: ''
columns:
- name: CUSTOMER_ID
description: ''
- name: PRODUCT_NAME
description: ''
- name: QUANTITY
description: ''
- name: PURCHASE_DATE
description: ''
- name: PURCHASE_COUNT
description: ''
Without further setup, you can immediately generate a joined table of manual and autogenerated sources.
{{ config(alias='table_using_autogenerated_and_client_generated_source') -}}
SELECT
CUSTOMER_NAME,
PURCHASE_DATE
FROM {{ source('DATAOPS_EXAMPLE_SCHEMA_SALESFORCE', 'CUSTOMER') }}
JOIN {{ source('MANUALLY_DEFINED_EXAMPLE_SCHEMA_SALESFORCE', 'PURCHASES') }}
ON ID = CUSTOMER_ID
Enabling/Disabling the auto-generation of the MATE source
A new variable named DATAOPS_GENERATE_TRANSFORM_SOURCES
has been implemented and set to 0
(False
) by default.
Setting this variable to true allows the metadata of any table you create in the SOLE configuration file to be automatically generated and transformed into a ready-to-use MATE dbt sources.yml
file. This file contains all the details from the configuration file databases.template.yml
in the template project.
To enable autogeneration:
- Add the variable
DATAOPS_GENERATE_TRANSFORM_SOURCES
to thevariables.yml
file in your project settings. - Set this variable to
1
(True
).
Once set, each run of MATE automatically generates ready-to-use MATE sources from SOLE.
With the sources file generated, your next step is to give explicit permission to use the generated dbt sources in your MATE project by making a few changes to the dbt_project.yml
file.
Head to ingesting data for the details.
To disable the autogeneration of the MATE sources and stop the sharing of ingestion table definitions, prevent the generation of the sources file by setting the variable DATAOPS_GENERATE_TRANSFORM_SOURCES
to 0
.