Snowpark (Python) Orchestrator
Type | Flexible |
---|---|
Image | $DATAOPS_SNOWPARKPYTHON_RUNNER_IMAGE |
Feature Status | PriPrev |
The Snowpark (Python) orchestrator is a flexible orchestrator that provides an intuitive API for querying and processing data in a data pipeline. Using this API, you can build applications that process data in Snowflake without moving the data to the system where your application code runs.
Usage
A typical use case for this orchestrator is to connect to Snowflake and execute SQL statements during pipeline execution.
Snowpark provides the following benefits:
- Snowpark supports push-down for all operations, including Snowflake User-Defined Functions (UDFs).
- Snowpark does not require a separate cluster outside of Snowflake for computations. All of the computations are done within Snowflake.
Example jobs
You can create scripts that wrap your Snowpark Python usage in your project repository and use the SNOWPARK_PYTHON_SCRIPT
variable to set the path for the script. Then run the script from inside your job:
"Snowpark-Summarize":
extends:
- .agent_tag
image: $DATAOPS_SNOWPARKPYTHON_RUNNER_IMAGE
stage: "Advanced Processing"
variables:
SNOWPARK_PYTHON_SCRIPT: $CI_PROJECT_DIR/script/summarize.py
SNOWPARK_USER: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.USERNAME)
SNOWPARK_PASSWORD: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.PASSWORD)
SNOWPARK_ACCOUNT: DATAOPS_VAULT(SNOWFLAKE.ACCOUNT)
SNOWPARK_ROLE: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.ROLE)
SNOWPARK_WAREHOUSE: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.WAREHOUSE)
SNOWPARK_DATABASE: SNOWPARK_DB
SNOWPARK_SCHEMA: PUBLIC
script:
- /dataops
Sample script
As an example Python workload, let us review a scenario where you group rows by ID
from a TEST_TABLE
and write the counts per ID to the SUMMARY_RESULTS
table:
from snowflake.snowpark import Session, GroupingSets
from snowflake.snowpark.functions import col
import os
connection_parameters = {
"account": os.environ['SNOWPARK_ACCOUNT'],
"user": os.environ['SNOWPARK_USER'],
"password": os.environ['SNOWPARK_PASSWORD'],
"role": os.environ['SNOWPARK_ROLE'],
"warehouse": os.environ['SNOWPARK_WAREHOUSE'],
"database": os.environ['SNOWPARK_DATABASE'],
"schema": os.environ['SNOWPARK_SCHEMA']
}
session = Session.builder.configs(connection_parameters).create()
df_table = session.table("TEST_TABLE").groupByGroupingSets(
GroupingSets(
[col("ID")]
)
).count()
df_table.write.mode("overwrite").saveAsTable("SUMMARY_RESULTS")