Snowpark (Python) Orchestrator
Enterprise
| Image | $DATAOPS_SNOWPARKPYTHON_RUNNER_IMAGE | 
|---|
The Snowpark (Python) orchestrator provides an intuitive API for querying and processing data in a data pipeline. Using this API, you can build applications that process data in Snowflake without moving the data to the system where your application code runs.
Usage
A typical use case for this orchestrator is to connect to Snowflake and execute SQL statements during pipeline execution.
Snowpark provides the following benefits:
- Snowpark supports push-down for all operations, including Snowflake User-Defined Functions (UDFs).
 - Snowpark does not require a separate cluster outside of Snowflake for computations. All of the computations are done within Snowflake.
 
Example jobs
You can create scripts that wrap your Snowpark Python usage in your project repository and use the SNOWPARK_PYTHON_SCRIPT variable to set the path for the script. Then run the script from inside your job:
- Authentication Using Username and Password
 - Authentication Using Key Pair
 
"Snowpark-Summarize":
  extends:
    - .agent_tag
  image: $DATAOPS_SNOWPARKPYTHON_RUNNER_IMAGE
  stage: "Source Testing"
  variables:
    SNOWPARK_PYTHON_SCRIPT: $CI_PROJECT_DIR/script/summarize.py
    SNOWPARK_ACCOUNT: DATAOPS_VAULT(SNOWFLAKE.ACCOUNT)
    SNOWPARK_USER: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.USERNAME)
    SNOWPARK_PASSWORD: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.PASSWORD)
    SNOWPARK_ROLE: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.ROLE)
    SNOWPARK_WAREHOUSE: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.WAREHOUSE)
    SNOWPARK_DATABASE: SALES_DATABASE
    SNOWPARK_SCHEMA: SALES
  script:
    - /dataops
"Snowpark-Summarize":
  extends:
    - .agent_tag
  image: $DATAOPS_SNOWPARKPYTHON_RUNNER_IMAGE
  stage: "Source Testing"
  variables:
    SNOWPARK_PYTHON_SCRIPT: $CI_PROJECT_DIR/script/summarize.py
    SNOWPARK_ACCOUNT: DATAOPS_VAULT(SNOWFLAKE.ACCOUNT)
    SNOWPARK_USER: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.USERNAME)
    SNOWPARK_KEY_PAIR: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.KEY_PAIR)
    SNOWPARK_PASSPHRASE: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.PASSPHRASE)
    SNOWPARK_ROLE: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.ROLE)
    SNOWPARK_WAREHOUSE: DATAOPS_VAULT(SNOWFLAKE.TRANSFORM.WAREHOUSE)
    SNOWPARK_DATABASE: SALES_DATABASE
    SNOWPARK_SCHEMA: SALES
  script:
    - /dataops
Sample script
As an example Python workload, let us review a scenario where you show all the rows from the STORE table where the SALESPERSONID is equal to 277:
- Authentication Using Username and Password
 - Authentication Using Key Pair
 
import os
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
connection_parameters = {
    "account": os.environ['SNOWPARK_ACCOUNT'],
    "user": os.environ['SNOWPARK_USER'],
    "password": os.environ['SNOWPARK_PASSWORD'],
    "role": os.environ['SNOWPARK_ROLE'],
    "warehouse": os.environ['SNOWPARK_WAREHOUSE'],
    "database": os.environ['SNOWPARK_DATABASE'],
    "schema": os.environ['SNOWPARK_SCHEMA']
}
session = Session.builder.configs(connection_parameters).create()
df_table = session.table("STORE").filter(col("SALESPERSONID") == 277)
df_table.show()
import importlib
import os
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
dataops_utils_path = "/runner-tools/dataops_utils.py"
module_name = os.path.basename(dataops_utils_path).replace('-', '_')
source_loader = importlib.machinery.SourceFileLoader(fullname=module_name, path=dataops_utils_path)
spec = importlib.util.spec_from_loader(name=module_name, loader=source_loader)
dataops_utils = importlib.util.module_from_spec(spec)
spec.loader.exec_module(dataops_utils)
key_pair = os.environ['SNOWPARK_KEY_PAIR']
passphrase = os.environ['SNOWPARK_PASSPHRASE']
temp_key_pair_path = "/tmp/key_pair.p8"
dataops_utils.save_key_pair_to_file(key_pair, temp_key_pair_path)
with open(temp_key_pair_path, 'rb') as f:
    p_key = serialization.load_pem_private_key(
        f.read(),
        password=passphrase.encode(),
        backend=default_backend()
)
pkb = p_key.private_bytes(
    encoding=serialization.Encoding.DER,
    format=serialization.PrivateFormat.PKCS8,
    encryption_algorithm=serialization.NoEncryption())
connection_parameters = {
    "account": os.environ['SNOWPARK_ACCOUNT'],
    "user": os.environ['SNOWPARK_USER'],
    "password": passphrase,
    "private_key": pkb,
    "role": os.environ['SNOWPARK_ROLE'],
    "warehouse": os.environ['SNOWPARK_WAREHOUSE'],
    "database": os.environ['SNOWPARK_DATABASE'],
    "schema": os.environ['SNOWPARK_SCHEMA']
}
session = Session.builder.configs(connection_parameters).create()
df_table = session.table("STORE").filter(col("SALESPERSONID") == 277)
df_table.show()