Skip to main content

Snowpark (Python) Orchestrator

Enterprise

Image$DATAOPS_SNOWPARKPYTHON_RUNNER_IMAGE
Feature Status
Feature release status badge: PubPrev
PubPrev

The Snowpark (Python) orchestrator provides an intuitive API for querying and processing data in a data pipeline. Using this API, you can build applications that process data in Snowflake without moving the data to the system where your application code runs.

Usage

A typical use case for this orchestrator is to connect to Snowflake and execute SQL statements during pipeline execution.

Snowpark provides the following benefits:

  • Snowpark supports push-down for all operations, including Snowflake User-Defined Functions (UDFs).
  • Snowpark does not require a separate cluster outside of Snowflake for computations. All of the computations are done within Snowflake.

Example jobs

You can create scripts that wrap your Snowpark Python usage in your project repository and use the SNOWPARK_PYTHON_SCRIPT variable to set the path for the script. Then run the script from inside your job:

pipelines/includes/local_includes/snowpark-python.yml
"Snowpark-Summarize":
extends:
- .agent_tag
image: $DATAOPS_SNOWPARKPYTHON_RUNNER_IMAGE
stage: "Advanced Processing"
variables:
SNOWPARK_PYTHON_SCRIPT: $CI_PROJECT_DIR/script/summarize.py
SNOWPARK_USER: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.USERNAME)
SNOWPARK_PASSWORD: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.PASSWORD)
SNOWPARK_ACCOUNT: DATAOPS_VAULT(SNOWFLAKE.ACCOUNT)
SNOWPARK_ROLE: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.ROLE)
SNOWPARK_WAREHOUSE: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.WAREHOUSE)
SNOWPARK_DATABASE: SNOWPARK_DB
SNOWPARK_SCHEMA: PUBLIC
script:
- /dataops

Sample script

As an example Python workload, let us review a scenario where you group rows by ID from a TEST_TABLE and write the counts per ID to the SUMMARY_RESULTS table:

/script/summarize.py
from snowflake.snowpark import Session, GroupingSets
from snowflake.snowpark.functions import col

import os

connection_parameters = {
"account": os.environ['SNOWPARK_ACCOUNT'],
"user": os.environ['SNOWPARK_USER'],
"password": os.environ['SNOWPARK_PASSWORD'],
"role": os.environ['SNOWPARK_ROLE'],
"warehouse": os.environ['SNOWPARK_WAREHOUSE'],
"database": os.environ['SNOWPARK_DATABASE'],
"schema": os.environ['SNOWPARK_SCHEMA']
}
session = Session.builder.configs(connection_parameters).create()

df_table = session.table("TEST_TABLE").groupByGroupingSets(
GroupingSets(
[col("ID")]
)
).count()

df_table.write.mode("overwrite").saveAsTable("SUMMARY_RESULTS")