Snowpark (Python) Orchestrator
Enterprise
Image | $DATAOPS_SNOWPARKPYTHON_RUNNER_IMAGE |
---|
The Snowpark (Python) orchestrator provides an intuitive API for querying and processing data in a data pipeline. Using this API, you can build applications that process data in Snowflake without moving the data to the system where your application code runs.
Usage
A typical use case for this orchestrator is to connect to Snowflake and execute SQL statements during pipeline execution.
Snowpark provides the following benefits:
- Snowpark supports push-down for all operations, including Snowflake User-Defined Functions (UDFs).
- Snowpark does not require a separate cluster outside of Snowflake for computations. All of the computations are done within Snowflake.
Example jobs
You can create scripts that wrap your Snowpark Python usage in your project repository and use the SNOWPARK_PYTHON_SCRIPT
variable to set the path for the script. Then run the script from inside your job:
- Authentication Using Username and Password
- Authentication Using Key Pair
"Snowpark-Summarize":
extends:
- .agent_tag
image: $DATAOPS_SNOWPARKPYTHON_RUNNER_IMAGE
stage: "Source Testing"
variables:
SNOWPARK_PYTHON_SCRIPT: $CI_PROJECT_DIR/script/summarize.py
SNOWPARK_ACCOUNT: DATAOPS_VAULT(SNOWFLAKE.ACCOUNT)
SNOWPARK_USER: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.USERNAME)
SNOWPARK_PASSWORD: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.PASSWORD)
SNOWPARK_ROLE: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.ROLE)
SNOWPARK_WAREHOUSE: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.WAREHOUSE)
SNOWPARK_DATABASE: SALES_DATABASE
SNOWPARK_SCHEMA: SALES
script:
- /dataops
"Snowpark-Summarize":
extends:
- .agent_tag
image: $DATAOPS_SNOWPARKPYTHON_RUNNER_IMAGE
stage: "Source Testing"
variables:
SNOWPARK_PYTHON_SCRIPT: $CI_PROJECT_DIR/script/summarize.py
SNOWPARK_ACCOUNT: DATAOPS_VAULT(SNOWFLAKE.ACCOUNT)
SNOWPARK_USER: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.USERNAME)
SNOWPARK_KEY_PAIR: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.KEY_PAIR)
SNOWPARK_PASSPHRASE: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.PASSPHRASE)
SNOWPARK_ROLE: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.ROLE)
SNOWPARK_WAREHOUSE: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.WAREHOUSE)
SNOWPARK_DATABASE: SALES_DATABASE
SNOWPARK_SCHEMA: SALES
script:
- /dataops
Sample script
As an example Python workload, let us review a scenario where you show all the rows from the STORE
table where the SALESPERSONID
is equal to 277:
- Authentication Using Username and Password
- Authentication Using Key Pair
import os
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
connection_parameters = {
"account": os.environ['SNOWPARK_ACCOUNT'],
"user": os.environ['SNOWPARK_USER'],
"password": os.environ['SNOWPARK_PASSWORD'],
"role": os.environ['SNOWPARK_ROLE'],
"warehouse": os.environ['SNOWPARK_WAREHOUSE'],
"database": os.environ['SNOWPARK_DATABASE'],
"schema": os.environ['SNOWPARK_SCHEMA']
}
session = Session.builder.configs(connection_parameters).create()
df_table = session.table("STORE").filter(col("SALESPERSONID") == 277)
df_table.show()
import importlib
import os
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
dataops_utils_path = "/runner-tools/dataops_utils.py"
module_name = os.path.basename(dataops_utils_path).replace('-', '_')
source_loader = importlib.machinery.SourceFileLoader(fullname=module_name, path=dataops_utils_path)
spec = importlib.util.spec_from_loader(name=module_name, loader=source_loader)
dataops_utils = importlib.util.module_from_spec(spec)
spec.loader.exec_module(dataops_utils)
key_pair = os.environ['SNOWPARK_KEY_PAIR']
passphrase = os.environ['SNOWPARK_PASSPHRASE']
temp_key_pair_path = "/tmp/key_pair.p8"
dataops_utils.save_key_pair_to_file(key_pair, temp_key_pair_path)
with open(temp_key_pair_path, 'rb') as f:
p_key = serialization.load_pem_private_key(
f.read(),
password=passphrase.encode(),
backend=default_backend()
)
pkb = p_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption())
connection_parameters = {
"account": os.environ['SNOWPARK_ACCOUNT'],
"user": os.environ['SNOWPARK_USER'],
"password": passphrase,
"private_key": pkb,
"role": os.environ['SNOWPARK_ROLE'],
"warehouse": os.environ['SNOWPARK_WAREHOUSE'],
"database": os.environ['SNOWPARK_DATABASE'],
"schema": os.environ['SNOWPARK_SCHEMA']
}
session = Session.builder.configs(connection_parameters).create()
df_table = session.table("STORE").filter(col("SALESPERSONID") == 277)
df_table.show()