Skip to main content

Snowpark (Python) Orchestrator

Enterprise

Image$DATAOPS_SNOWPARKPYTHON_RUNNER_IMAGE

The Snowpark (Python) orchestrator provides an intuitive API for querying and processing data in a data pipeline. Using this API, you can build applications that process data in Snowflake without moving the data to the system where your application code runs.

Usage

A typical use case for this orchestrator is to connect to Snowflake and execute SQL statements during pipeline execution.

Snowpark provides the following benefits:

  • Snowpark supports push-down for all operations, including Snowflake User-Defined Functions (UDFs).
  • Snowpark does not require a separate cluster outside of Snowflake for computations. All of the computations are done within Snowflake.

Example jobs

You can create scripts that wrap your Snowpark Python usage in your project repository and use the SNOWPARK_PYTHON_SCRIPT variable to set the path for the script. Then run the script from inside your job:

pipelines/includes/local_includes/snowpark-python.yml
"Snowpark-Summarize":
extends:
- .agent_tag
image: $DATAOPS_SNOWPARKPYTHON_RUNNER_IMAGE
stage: "Source Testing"
variables:
SNOWPARK_PYTHON_SCRIPT: $CI_PROJECT_DIR/script/summarize.py
SNOWPARK_ACCOUNT: DATAOPS_VAULT(SNOWFLAKE.ACCOUNT)
SNOWPARK_USER: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.USERNAME)
SNOWPARK_PASSWORD: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.PASSWORD)
SNOWPARK_ROLE: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.ROLE)
SNOWPARK_WAREHOUSE: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.WAREHOUSE)
SNOWPARK_DATABASE: SALES_DATABASE
SNOWPARK_SCHEMA: SALES
script:
- /dataops

Sample script

As an example Python workload, let us review a scenario where you show all the rows from the STORE table where the SALESPERSONID is equal to 277:

/script/summarize.py
import os

from snowflake.snowpark import Session
from snowflake.snowpark.functions import col

connection_parameters = {
"account": os.environ['SNOWPARK_ACCOUNT'],
"user": os.environ['SNOWPARK_USER'],
"password": os.environ['SNOWPARK_PASSWORD'],
"role": os.environ['SNOWPARK_ROLE'],
"warehouse": os.environ['SNOWPARK_WAREHOUSE'],
"database": os.environ['SNOWPARK_DATABASE'],
"schema": os.environ['SNOWPARK_SCHEMA']
}
session = Session.builder.configs(connection_parameters).create()

df_table = session.table("STORE").filter(col("SALESPERSONID") == 277)
df_table.show()