Building Data Products
Data products can be standalone or composite, depending on their purpose and operational approach. Running DataOps pipelines results in building data products, updating them, and refreshing their data to create different versions of the data products once there are any changes.
A data product is the output of a single DataOps pipeline - a parent pipeline that may use child pipelines. Building a data product involves data quality, data governance, reproducibility, scalability, and maintainability. DataOps.live achieves this by using robust engineering practices, version control, testing, automation, and collaboration methodologies, all enhanced by the powerful SOLE and MATE engines and the orchestration capabilities.
The DataOps.live platform offers a data product registry and metadata objects managed within the platform infrastructure at a group or project level to manage data products and their dependencies.
Standalone data products
Standalone data products workflow
The basic top-level workflow to create a standalone data product is as below:
- Configure the data product definition in the specification file.
- Add the Data Product orchestrator to the pipeline.
- If this is the first data product in your DataOps project, create the necessary deploy tokens.
- Run the data product pipeline manually for the first time. This creates an entry for the data product in the data product registry.
- Schedule (or trigger) the data product to run regularly.
- At the end of each run, the data product registry gets a new entry for the latest version of the data product, including information such as updated time and automated test results.
- Optionally, push to your data catalog for business users.
Building a standalone data product
Log in to the DataOps.live platform.
Navigate to your project and create a data product specification file under
/dataops
in a new folder in your project directory.data product definition templateid: [data product id ]
name: [data product name]
description: [data product description]
schema_version: 1.0.0
dataset:
name: [Name of the dataset]
description: [description of the dataset]
mate_models:
- select: [selector of the models that we would like to include in the data product]
output_ports:
-id: [id of the output_port]
name: [output port name]
type: [type usually Tables]
description: [description of the output port]
service_level_indicators:
- mate_tests:
- select: [selector of the test that we would like to include in the data product]
service_level_objectives:
- name: [name of the SLO]
description: [description of the SLO]
test_select: [selector of the test that is related to this SLO]Example of a data product specification file:
DataProductA.ymlid: DataProductA
name: Data Product A - Person data
description: Data Product A - Person data - for data product demos - derived from WidgetCo Customer Segmentation Data Product and Streamlit app.
schema_version: 1.0.0
output_ports:
- id: PersonData
name: Data Product A - Person data
type: tables
description: Data Product A - Person data - for data product demos
service_level_indicators:
- mate_tests:
- select: dim_customer
service_level_objectives:
- name: PERSON_DATAPRODUCT models test
description: All PERSON_DATAPRODUCT models tests should pass
test_select: dim_customer
dataset:
name: Person_data
description: Data Product A - Person data
mate_models:
- select: dim_customerAdd a Data Product orchestrator job to the pipeline.
The Data Product orchestrator enriches the Data Product Specification with the metadata from the pipeline run. By using the provided MATE selectors in the specifications file, the orchestrator adds all the objects and tests that are part of the data product. Once you run the pipeline that includes the orchestrator, you upload a data product manifest to the data product registry.
noteYou can use different methods to transform data in your data products. For more information, see Creating Custom Data Products Snippets.
Create a
data_product_orchestrator.yml
job file with the following structure:data_product_orchestrator.yml"Data Product Orchestrator":
extends:
- .agent_tag
stage: "Data Product"
image: dataopslive/dataops-data-product-definition-runner:DATAOPS-8619
variables:
DATAOPS_DATA_PRODUCT_SPECIFICATION_FILE: [path to the data product specification file]
DATAOPS_DATA_PRODUCT_REFERENCE_FILE: [path to the Data Product Manifest that we would like to use for backward compatability][optional]
DATAOPS_DATA_PRODUCT_REFERENCE_FILE_FROM_DIFFERENT_BRANCH: 1 [optional]
script:
- /dataops
artifacts:
paths:
- dataops/report/data_product_manifest_viewer/
name: "Data Product Manifest Viewer"
expose_as: "dataopsreport"
icon: ${DATAOPS_ICON}The
artifacts:
section of this script adds a human-readable output of the data product manifest in the report section of the pipeline:Click the report name to view the data product metadata generated by the pipeline.
Add
Data Product
towards the bottom of the list, beforeClean Up
, in the filepipelines/includes/config/stages.yml
of the pipelines - the predefined job will be in this stage.stages.ymlstages:
- Pipeline Initialisation
- Vault Initialisation
- Snowflake Setup
- Additional Configuration
- Data Ingestion
- Source Testing
- Data Transformation
- Transformation Testing
- Generate Docs
- Data Product
- Clean Up
Navigate to the project Settings > Repository and scroll to the Deploy tokens section. Create a deploy token with read and write registry access with the
gitlab-deploy-token
.Using this name means the token will be available for the pipeline job as CI_DEPLOY_PASSWORD.
Run the
full-ci.yml
file containing the pipeline definition in your project directory.See Running Pipelines for more information about the methods to run pipelines.
Building a multi-version data product pipeline
You can load multiple versions of one data product in a single pipeline. However, we recommend having different database schemas for each of the versions. To avoid data duplication, you can create a view on top of one of the versions and include this view in the other version. This approach is possible when objects don't have any change between the data product versions.
1. Create the new models for the new data product version
- In your project directory, navigate to
/dataops/modelling
. - Create new models for the new version. It is recommended to have a different subfolder for each of the versions.
2. Create a data product specification file for the new data product version
You can create a copy of the current data product version specification and add a suffix after the name, e.g. my_data_product_v2.yml
.
Change the
schema_version
attribute in the specification.Change the
mate_models
andselect
under themate_test
selector.my_data_product_v2.ymldataset:
mate_models:
- select:
[
selector of the models that we would like to use for the new data product,
]
service_level_indicators:
- mate_tests:
- select:
[
selector of the test that we would like to use for the new data product,
]
service_level_objectives:
- name: [name of the SLO]
description: [description of the SLO]
test_select:
[
selector of the test that we would like to use for the new data product in this SLO,
]The new selector selects the models and tests that will be part of the new data product version.
Save the data product specification for the new data product version.
3. Create MATE jobs for the new models and tests
Create or update the MATE jobs with new models and tests representing your new specification.
4. Create a Data Product orchestrator job that will use the new version specification file
Update the orchestrator to point to the following specification.
"Data Product Orchestrator v2":
extends:
- .agent_tag
stage: "Data Product"
image: dataopslive/dataops-data-product-definition-runner:DATAOPS-8619
variables:
DATAOPS_DATA_PRODUCT_SPECIFICATION_FILE: dataops/data-product-definitions/my_data_product_v2.yml
script:
- /dataops
artifacts:
paths:
- dataops/report/data_product_manifest_viewer/
# the name of the artifacts should be different for each version
name: "Data Product Manifest Viewer v2"
expose_as: "dataopsreport"
icon: ${DATAOPS_ICON}
5. Include the new jobs in the -ci.yml
file
Composite data products
Composite data products workflow
The primary consideration when building a logical hierarchy of data products, where end-user products are built on intermediate products built on source data products, is how to automate, propagate, and trigger data refresh through this hierarchy.
Every data product must have attributes and data structure representation stored in the central project registry, where you can write rules using these attributes to decide the behavior of your data products without any limits to the nesting and depth of compound data products.
Here is a typical top-level workflow to create a composite data product:
- Configure the data product definition in the specification file.
- Create required transformation, testing configurations etc.
- (Optional) Create rules for dependencies. The composite data product will rerun every time the rules are met.
- Run the data product pipeline for the first time. This creates an entry for the composite data product in the data product Registry.
- Optionally, push to the data catalog for business users.
Building a composite data product
The DataOps.live platform uses a data product engine to handle complex data product dependencies, ensuring efficient building and management of data products.
This engine handles data product orchestration by triggering pipelines based on the rules set in the data product specification. You can have multiple active versions of the specification and the data product engine.
Log in to the DataOps.live platform.
Navigate to your project and create a data product specification file in a new folder in your project directory that includes the rules and actions for the composite data products. See Composite data products engine rules for more information.
Composite_template.ymlid: [data product id ]
name: [data product name]
description: [data product description]
schema_version: 1.0.0
dataset:
name: [Name of the dataset]
description: [description of the dataset]
mate_models:
- select: [selector of the models that we would like to include in the data product]
output_ports:
-id: [id of the output_port]
name: [output port name]
type: [type usually Tables]
description: [description of the output port]
service_level_indicators:
- mate_tests:
- select: [selector of the test that we would like to include in the data product]
service_level_objectives:
- name: [name of the SLO]
description: [description of the SLO]
test_select: [selector of the test that are related to this SLO]
input_ports:
- name: [identifier that we will use for reference]
type: [optional, defaults to `DataOps.live Data Product`]
dataproduct_id: [id of the dataproduct]
output_port_id: [output_port_id that we are referencing]
schema_version: [schema version of the source data product]
branch: [name of the branch ]
data_product_registry: [path to the Data Product Registry]
rules:
# action defines what to do when this rule passes - in this case, build the Product
- action: build_DataProductR
# the rule defines the logic for triggering the action. In this case:
# Data Product A has been published within the last hour
# AND Data Product B has been published within the last hour
# AND Data Product C has been published within the last hour
# AND this Data Product was last triggered more than an hour ago.
#
# In summary, when all upstream products are less than an hour old, and this product hasn't
# been run in the last hour, trigger the pipeline.
rule: |-
DataProductA.publication_datetime > engine.now - t"PT1H"
and DataProductB.publication_datetime > engine.now - t"PT1H"
and DataProductC.publication_datetime > engine.now - t"PT1H"
and engine.DataProductR.last_triggered < engine.now - t"PT1H"
actions:
- name: [name of the action reference in the rules section]
type: trigger-pipeline
trigger_pipeline: # section is mandatory if type = trigger_pipeline
pipeline_file: [name of the yml file that should be triggered]
branch: [branch name]
variables: [optional, list of variables]
- name: [variable_name]
value: [variable_value]Example of a composite data product specification file:
DataProductR.ymlid: DataProductR
name: Data Product R - Customer 360 data
description: Data Product R - Customer 360 data - for data product demos - composable data product from products A, B, and C. Derived from WidgetCo Customer Segmentation Data Product and Streamlit App.
schema_version: 1.0.0
output_ports:
- id: Customer360
name: Data Product R - Customer 360 data
type: tables
description: Data Product R - Customer 360 data - for data product demos
service_level_indicators:
- mate_tests:
- select: customer_360
- select: customer_lifetime_spend
service_level_objectives:
- name: Customer 360 models test
description: All Customer 360 tests should pass
test_select: customer_360
test_select: customer_lifetime_spend
dataset:
name: Customer_360
description: Data Product R - Customer 360 data
mate_models:
- select: customer_360
- select: customer_lifetime_spend
governance:
- collibra:
collibra_community: Snowflake Summit Demo 2023
collibra_domain: CRM
input_ports:
- name: DataProductB
type: DataOps.live Data Product
dataproduct_id: DataProductB
output_port_id: SalesData
schema_version: 1.0.0
branch: main
data_product_registry: https://app.dataops.live/dataops-demo-project/data-products-demos/data-product-b
- name: DataProductA
type: DataOps.live Data Product
dataproduct_id: DataProductA
output_port_id: PersonData
schema_version: 1.0.0
branch: main
data_product_registry: https://app.dataops.live/dataops-demo-project/data-products-demos/data-product-a
- name: DataProductC
type: DataOps.live Data Product
dataproduct_id: DataProductC
output_port_id: ProductionData
schema_version: 1.0.0
branch: main
data_product_registry: https://app.dataops.live/dataops-demo-project/data-products-demos/data-product-c
rules:
# acton defines what to do when this rule passes - in this case, build the Product
- action: build_DataProductR
# the rule defines the logic for triggering the action. In this case:
# Data Product A has been published within the last hour
# AND Data Product B has been published within the last hour
# AND Data Product C has been published within the last hour
# AND this Data Product was last triggered more than an hour ago.
#
# In summary, when all upstream products are less than an hour old, and this product hasn't
# been run in the last hour, trigger the pipeline.
rule: |-
DataProductA.publication_datetime > engine.now - t"PT1H"
and DataProductB.publication_datetime > engine.now - t"PT1H"
and DataProductC.publication_datetime > engine.now - t"PT1H"
and engine.DataProductR.last_triggered < engine.now - t"PT1H"
actions:
- name: build_DataProductR
type: trigger-pipeline
trigger_pipeline:
pipeline_file: full-ci.yml
branch: mainAdd a Data Product orchestrator job to the pipeline.
The Data Product orchestrator enriches the data product specification with the metadata from the pipeline run. By using the provided MATE selectors in the specifications file, the orchestrator adds all the objects and tests that are part of the data product. Once you run the pipeline that includes the orchestrator, you upload a data product manifest to the registry.
Create a
data_product_orchestrator.yml
job file with the following structure:"Data Product Orchestrator":
extends:
- .agent_tag
stage: "Data Product"
image: dataopslive/dataops-data-product-definition-runner:DATAOPS-8619
variables:
DATAOPS_DATA_PRODUCT_SPECIFICATION_FILE: path to the Data Product Specification file
DATAOPS_DATA_PRODUCT_REFERENCE_FILE: path to the Data Product Manifest that we would like to use for backward compatibility
DATAOPS_DATA_PRODUCT_REFERENCE_FILE_FROM_DIFFERENT_BRANCH: 1
script:
- /dataops
artifacts:
paths:
- dataops/report/data_product_manifest_viewer/
name: "Data Product Manifest Viewer"
expose_as: "dataopsreport"
icon: ${DATAOPS_ICON}The
artifacts:
section of this script adds a human-readable output of the data product manifest in the report section of the pipeline:Click the report name to view the data product metadata generated by the pipeline.
Add the stage
Data Product
towards the bottom of the list, beforeClean Up
, in the filepipelines/includes/config/stages.yml
of the pipelines - the predefined job will be in this stage.
stages.ymlstages:
- Pipeline Initialisation
- Validate and Setup
- Vault Initialisation
- Snowflake Setup
- Additional Configuration
- Ingestion
- Query Snowflake
- Source Testing
- Curation Build
- Curation Testing
- Data Transformation
- Data Transformation Testing
- Run Snowpark
- Advanced Processing
- Model Execution
- Validate Configurations
- Data Vault Build
- Data Vault Test
- Datascience Build
- Datascience Testing
- Datascience Processing
- Publish Build
- Publish Testing
- Grant Permissions
- Build Apps
- Deploy Apps
- Meta Data Actions
- Model Retraining
- Generate Docs
- Data Product
- Data Catalog Push
- Clean UpNavigate to the project Settings > Repository and scroll to the Deploy tokens section. Create a deploy token with read and write registry access with the
gitlab-deploy-token
.Using this name means the token will be available for the pipeline job as
CI_DEPLOY_PASSWORD
.At the root of your project file structure, run the
-ci.yml file
containing the pipeline definition.
Composite data products engine rules
You can use the rules:
and actions:
sections in the data product specification to define the auto-trigger conditions of the composite data products. The rules:
section in the specification file must describe when to perform an action. The actions:
section must define what to do when performing the action.
Let's see an example where the rule build_aggregate_data
is set to aggregate all sources and give an overview of the daily orders of the company once met.
id: agg_data
name: agg_data
description: Aggregate all sources and give an overview of the daily orders of the company
schema_version: 1.0
output_ports:
- id: supp_data
schema_version: 1.0.
branch: enrich_dpd
data_product_registry: https://app.dataops.live/dataops-internal/data-product-dependencies/supplier-data
- id: crm_data
schema_version: 1.0.
branch: enrich_dpd
data_product_registry: https://app.dataops.live/dataops-internal/data-product-dependencies/crm_data
rules:
- action: build_aggregate_data
rule: |-
crm_data.publication_datetime > engine.now - t"PT1H"
and supp_data.publication_datetime > engine.now - t"PT1H"
and engine.agg_data.last_triggered < engine.now - t"PT30M"
and engine.agg_data.last_triggered < crm_data.
publication_datetime
and engine.agg_data.last_triggered < supp_data.
publication_datetime
actions:
- name: build_aggregate_data
type: trigger-pipeline
trigger_pipeline:
pipeline_file: Aggregate_DataProduct-ci.yml
branch: enrich_dpd
variables:
- name: DATA_PRODUCT_FOLDER
value: dataops/data-product-definitions
- name: DATA_PRODUCT_SOURCE_FILE
value: Aggregate_Data.yml
The rule has five parts combined using the logical AND operator. All parts must evaluate to true for the rule to pass. The first two parts say both crm_data
and supp_data
must have been published in the last 1 hour.
crm_data.publication_datetime > engine.now - t"PT1H"
and supp_data.publication_datetime > engine.now - t"PT1H"
The next three parts ensure three more criteria.
and engine.agg_data.last_triggered < engine.now - t"PT30M"
and engine.agg_data.last_triggered < crm_data.publication_datetime
and engine.agg_data.last_triggered < supp_data.publication_datetime
- The data product engine must not have triggered the pipeline for
agg_data
in the last 30 minutes - we recommend including a similar rule to prevent unexpected parallel runs of a pipeline. - The
crm_data
must have been published since the last trigger of the pipeline foragg_data
. - The
supp_data
must have been published since the last trigger of the pipeline foragg_data
.
The build_aggregate_data
action triggers a pipeline run on the main branch of this project using the Aggregate_DataProduct-ci.yml
pipeline file. Also, some pipeline variables are made available to the pipeline run.
- name: build_aggregate_data
type: trigger-pipeline
trigger_pipeline:
pipeline_file: Aggregate_DataProduct-ci.yml
branch: main
variables:
- name: DATA_PRODUCT_FOLDER
value: dataops/data-product-definitions
- name: DATA_PRODUCT_SOURCE_FILE
value: Aggregate_Data.yml
Working with data products in DDE
You can use the ready-to-code DDE (DataOps Development Environment) to speed up the development process and automatically assemble all the necessary resources to create more robust data products and manage their lifecycle.
The basic top-level workflow to create a data product using DDE within the DataOps.live platform is as below:
- Create a project from a template.
- Define the data product infrastructure using SOLE.
- Configure orchestration for data ingestion using MATE capabilities.
- Configure data transformation using the transformation and auto-ingestion capabilities in MATE.
- Configure automated testing using MATE.
- Iterate and test.
Setting up DDE
To instruct the GitPod to use the correct container, add a .gitpod.yml
file to the project's directory containing the data product. For example:
tasks:
- name: Setup
before: |
# Needed when comparing from a feature branch
gp env GOLDEN_MANIFEST_FROM_DIFFERENT_BRANCH=1
/dataops-cde/scripts/dataops_cde_setup.sh
# Installs optional pre-commit hook(s)
# pip install pre-commit
# pre-commit install
# The current image used for data products in DDE
image: dataopslive/dataops-development-workspace:DATAOPS-8619
Running tests for a single data product
Once in the DDE, you can run a single test by running these commands.
For this to work, you must set up the MATE environment variables and have access to Snowflake.
# The DBT project folder
cd dataops/modelling
dbt docs generate
cd ../../
# change the variables to actual files
dpd --spec ${specification_file} --ref ${reference_manifest}
The generated report for this data product will pop up in your browser. Make sure your browser does not block this popup.
Adding mapping between data Products specification and reference specification
You need the mapping to run the data-product-test.sh
script. This script is used by the pre-commit and the VS Code plugin for testing data products.
Add a file named data_products_reference.yml
in the root directory. It holds the mapping between data product specifications and what they will be tested against. For example:
- spec: dataops/data-product-definitions/CRM_data.yml
ref: dataops/data-product-definitions/CRM_data_reference.yml
Once this is done, you can use this VS Code button:
Adding an optional pre-commit hook
You can add a pre-commit hook to validate the data products for breaking changes.
Add the pre-commit script by coping it once you are in the DDE:
mkdir hooks
cp /runner-tools/data-product-test.sh ./hooks/data-product-test.shAdd the pre-commit tool configuration
.pre-commit-config.yaml
:.pre-commit-config.yamlrepos:
- repo: local
hooks:
- id: custom-hook
language: system
name: custom-hook
entry: hooks/data-product-test.sh