Skip to main content

Python Orchestrators

Within your DataOps projects, you can use different Python versions for tasks ranging from running a single Python script to running fully developed applications that have third-party dependency. To select your Python version, you only need to pass the corresponding image variable into the Python job in your project.

Python 3.8 orchestrator

TypeFlexible
Image$DATAOPS_PYTHON3_RUNNER_IMAGE

The Python 3.8 orchestrator is a flexible orchestrator that executes the commands and scripts using Python version 3.8 and comes pre-packaged with the following major Python libraries:

  • boto3
  • click
  • Jinja2
  • PyYAML
  • requests
  • responses
  • snowflake-connector-python

Usage

Both orchestrators are used to execute a Python script, such as /scripts/my-script.py, inside your DataOps project. However, they can execute more than just a single Python script. You can also run more complex Python projects with them.

If you need to pass variables into the Python job, you can set them in the YAML config file's variables block. For example:

pipelines/includes/local_includes/python3_jobs/my_python_job.yml
"My Python Job":
extends:
- .agent_tag
stage: "My Stage"
image: $DATAOPS_PYTHON3_RUNNER_IMAGE
variables:
MY_VAR: value
SPECIFIED_PATH: ${CI_PROJECT_DIR}/python_prj
DATAOPS_RUN_PYTHON_SCRIPT: ${CI_PROJECT_DIR}/python_prj/main.py
script:
- /dataops
icon: ${PYTHON_ICON}

Supported parameters

ParameterRequired/Optional/DefaultDescription
DATAOPS_RUN_PYTHON_SCRIPTOptionalPath to the main Python script. If not specified, you must call the Python script (your-script.py) directly instead of using /dataops as the script parameter
FILEOptional. Defaults to requirements.txtName of the file used to run pip install
SPECIFIED_PATHOptional. Defaults to $CI_PROJECT_DIRPath to requirements.txt file

Example jobs

1. Run a script requiring no variables

The following example shows how to execute a Python script that does not need any input variables:

pipelines/includes/local_includes/python3_jobs/execute_python3_script.yml
"Execute Python3 Script":
extends:
- .agent_tag
stage: "Additional Configuration"
image: $DATAOPS_PYTHON3_RUNNER_IMAGE
variables:
DATAOPS_RUN_PYTHON_SCRIPT: $CI_PROJECT_DIR/scripts/my-script.py
script:
- /dataops
icon: ${PYTHON-ICON}

2. Retrieve variables from the DataOps vault

To retrieve the secrets PATH.TO.VAR1 and PATH.TO.VAR2 from the DataOps vault, add them as variables to the YAML config file as follows:

pipelines/includes/local_includes/python3_jobs/extract_from_vault.yml
"Extract from Vault":
extends:
- .agent_tag
stage: "Additional Configuration"
image: $DATAOPS_PYTHON3_RUNNER_IMAGE
variables:
DATAOPS_RUN_PYTHON_SCRIPT: $CI_PROJECT_DIR/scripts/my-script.py
VAR1: DATAOPS_VAULT(PATH.TO.VAR1)
VAR2: DATAOPS_VAULT(PATH.TO.VAR2)
script:
- /dataops
icon: ${PYTHON_ICON}

Access these variables using os.getenv() in your Python code, as the following code snippet indicates:

scripts/my-script.py
os.getenv('VAR1')
os.getenv('VAR2')

3. Install additional Python packages

You can install Python packages that are not pre-installed in the Python 3.8 or Python 3.11 orchestrator images by adding a requirements.txt file with the list of required packages to your project.

The orchestrator searches for the file requirements.txt by default in the root of the project directory, but you can specify a different path using the SPECIFIED_PATH variable. You can also override the file name using the FILE variable.

Following is an example of how you can install additional packages:

requirements.txt
# Install the latest version of the package
numpy

# Install a specific version of the package
scipy==1.0.0

# Install the latest available package within the specified version range
pandas>=1.0.0,<2.0.0
pipelines/includes/local_includes/python3_jobs/execute_python3_script.yml
"Execute Python3 Script":
extends:
- .agent_tag
stage: "Additional Configuration"
image: $DATAOPS_PYTHON3_RUNNER_IMAGE
variables:
DATAOPS_RUN_PYTHON_SCRIPT: $CI_PROJECT_DIR/scripts/my_script.py
script:
- /dataops
icon: ${PYTHON-ICON}

4. Develop a Python project

There are times when you need to execute a fully-fledged Python project. This mini example shows you how to set one up.

At the end of this example, you will have completed the following actions:

  • Have a separate, self-contained directory for your Python project in the directory /python_prj
  • Have created a pip requirements.txt to install dependencies
  • Have executed your main application in the context of the /dataops script

Additionally, the goal is to create the following Python project layout:

/python_prj
__init__.py
requirements.txt
main.py
module1.py

Step 1

Create the two files /python_prj/__init__.py and /python_prj/requirements.txt. While not used actively in this example, these files are your starting points for static helpers and project dependencies.

Therefore, as an example, a starting requirements.txt is as follows:

/python_prj/requirements.txt
pyjson ~= 1.3

Step 2

Create your main application. In our use case, this application reports on commonly used DataOps variables and sets up a mode variable for further use in your app.

/python_prj/main.py
import logging
import os

from module1 import Module1

## Set up logging
log_level = logging.DEBUG if os.getenv('DATAOPS_DEBUG') else logging.INFO
logging.basicConfig(format='\033[38;5;99m%(filename)s\033[0m \033[90m%(levelname)s\033[0m %(message)s')
logger = logging.getLogger('python_prj')
logger.setLevel(log_level)

### Log commonly used DataOps variables
def log_dataops_vars():
logger.info("Key DataOps variables")
logger.info("\tDATAOPS_PREFIX=%s", os.getenv('DATAOPS_PREFIX'))
logger.info("\tDATAOPS_DATABASE=%s", os.getenv('DATAOPS_DATABASE'))
logger.info("\tDATAOPS_ENV_NAME=%s", os.getenv('DATAOPS_ENV_NAME'))
logger.info("\tDATAOPS_PERSISTENT_CACHE_DIR=%s", os.getenv('DATAOPS_PERSISTENT_CACHE_DIR'))
logger.info("\tDATAOPS_PIPELINE_CACHE_DIR=%s", os.getenv('DATAOPS_PIPELINE_CACHE_DIR'))
logger.info("\tDATAOPS_PERSISTENT_CACHE_DIR=%s", os.getenv('DATAOPS_PERSISTENT_CACHE_DIR'))
logger.info("\tDATAOPS_DEBUG=%s", os.getenv('DATAOPS_DEBUG'))

logger.info("Key pipeline variables")
logger.info("\tCI_COMMIT_BRANCH=%s", os.getenv('CI_COMMIT_BRANCH'))
logger.info("\tCI_JOB_NAME=%s", os.getenv('CI_JOB_NAME'))
logger.info("\tCI_JOB_STAGE=%s", os.getenv('CI_JOB_STAGE'))

def main():
logger.debug('main starting...')

log_dataops_vars()

mode = os.getenv('MODE') or 'default'
logger.info("Running in mode %s", mode)

m1 = Module1(logger=logger)

try:
m1.say_something("Hello!")
except Exception as ex:
logger.error(str(ex))

logger.debug('main is done')

if __name__ == '__main__':
main()

Step 3

Code your module as follows:

python_prj/module1.py
import logging

class Module1:

def __init__(self, logger=logging):
self.logger = logger
pass

def say_something(self, what):
self.logger.info(what)

Step 4

Finally, run your main application from the Python 3.8 or Python 3.11 orchestrator job using the following YAML code:

pipelines/includes/local_includes/python_prj_jobs/run_main.yml
"Run Python Project":
extends:
- .agent_tag
stage: "Pipeline Initialisation"
image: $DATAOPS_PYTHON3_RUNNER_IMAGE
variables:
# user defined variable
MODE: run

# DataOps Parameters
SPECIFIED_PATH: ${CI_PROJECT_DIR}/python_prj
DATAOPS_RUN_PYTHON_SCRIPT: ${CI_PROJECT_DIR}/python_prj/main.py

script:
- /dataops
icon: ${PYTHON_ICON}

If the job executes successfully, the job log will be similar to the following example:

main.py DEBUG main starting...
main.py INFO Key DataOps variables
main.py INFO DATAOPS_PREFIX=DATAOPS
main.py INFO DATAOPS_DATABASE=None
main.py INFO DATAOPS_ENV_NAME=None
main.py INFO DATAOPS_PERSISTENT_CACHE_DIR=/agent_cache/persistent_cache
main.py INFO DATAOPS_PIPELINE_CACHE_DIR=/agent_cache/304076
main.py INFO DATAOPS_PERSISTENT_CACHE_DIR=/agent_cache/persistent_cache
main.py INFO DATAOPS_DEBUG=1
main.py INFO Key pipeline variables
main.py INFO CI_COMMIT_BRANCH=main
main.py INFO CI_JOB_NAME=Run Python Project
main.py INFO CI_JOB_STAGE=Pipeline Initialisation
main.py INFO Running in mode run
module1.py INFO Hello!
main.py DEBUG main is done

If you execute the job with DATAOPS_DEBUG option set, the job log will include the following debug messages:

main.py DEBUG main starting...
main.py INFO Key DataOps variables
...
module1.py INFO Hello!
main.py DEBUG main is done

5. Export from Snowflake

info

This example is only compatible with Python 3.8.

This example builds on the concepts of the previous Python project. It demonstrates how to connect to Snowflake leveraging DataOps.live standard variables.

By the end of this example, you will have completed the following actions:

  • Have a separate, self-contained directory for your Python project in the directory /python_snowflake
  • Have created a pip requirements.txt to install the Snowflake connector dependencies
  • Have executed your main application in the context of the /dataops script

Step 1

Create a new file python_snowflake/requirements.txt:

python_snowflake/requirements.txt
# Source https://github.com/snowflakedb/snowflake-connector-python/blob/main/tested_requirements/requirements_38.reqs
asn1crypto==1.5.1
certifi==2021.10.8
cffi==1.15.0
charset-normalizer==2.0.12
cryptography==36.0.2
idna==3.3
oscrypto==1.3.0
pycparser==2.21
pycryptodomex==3.14.1
PyJWT==2.3.0
pyOpenSSL==21.0.0
pytz==2021.3
requests==2.27.1
six==1.16.0
urllib3==1.26.9
snowflake-connector-python==2.7.6

This action will ensure that the Snowflake Python Connector, compatible with Python 3.8 is available to your program.

Step 2

Create a new Python file python_snowflake/main.py:

python_snowflake/main.py
import csv
import logging
import os
import sys
import snowflake.connector
from snowflake.connector import DictCursor

## Set up logging
log_level = logging.DEBUG if os.getenv('DATAOPS_DEBUG') else logging.INFO
logging.basicConfig(format='\033[38;5;99m%(filename)s\033[0m \033[90m%(levelname)s\033[0m %(message)s')
logger = logging.getLogger('python_prj')
logger.setLevel(log_level)

## get connection information
snowflake_account = os.getenv('SNOWFLAKE_ACCOUNT')
snowflake_user = os.getenv('SNOWFLAKE_USER')
snowflake_password = os.getenv('SNOWFLAKE_PASSWORD')
snowflake_role = os.getenv('SNOWFLAKE_ROLE')
snowflake_warehouse = os.getenv('SNOWFLAKE_WAREHOUSE')

def log_dataops_vars():
""" Log key variables that you can use to customize your program's behavior"""

logger.info("Key DataOps variables")
logger.info("\tDATAOPS_PREFIX=%s", os.getenv('DATAOPS_PREFIX'))
logger.info("\tDATAOPS_DATABASE=%s", os.getenv('DATAOPS_DATABASE'))
logger.info("\tDATAOPS_ENV_NAME=%s", os.getenv('DATAOPS_ENV_NAME'))
logger.info("\tDATAOPS_PERSISTENT_CACHE_DIR=%s", os.getenv('DATAOPS_PERSISTENT_CACHE_DIR'))
logger.info("\tDATAOPS_PIPELINE_CACHE_DIR=%s", os.getenv('DATAOPS_PIPELINE_CACHE_DIR'))
logger.info("\tDATAOPS_PERSISTENT_CACHE_DIR=%s", os.getenv('DATAOPS_PERSISTENT_CACHE_DIR'))
logger.info("\tDATAOPS_DEBUG=%s", os.getenv('DATAOPS_DEBUG'))

logger.info("Key pipeline variables")
logger.info("\tCI_COMMIT_BRANCH=%s", os.getenv('CI_COMMIT_BRANCH'))
logger.info("\tCI_JOB_NAME=%s", os.getenv('CI_JOB_NAME'))
logger.info("\tCI_JOB_STAGE=%s", os.getenv('CI_JOB_STAGE'))

def log_snowflake_vars():
""" Log Snowflake connection variables. For password only report if set/unset"""

logger.info("Snowflake connection variables")
logger.info("\SNOWFLAKE_ACCOUNT=%s", snowflake_account)
logger.info("\SNOWFLAKE_USER=%s", snowflake_user)
logger.info("\SNOWFLAKE_ROLE=%s", snowflake_role)
logger.info("\SNOWFLAKE_PASSWORD=%s", 'is set' if snowflake_password is not None else 'unset' )
logger.info("\SNOWFLAKE_WAREHOUSE=%s", snowflake_warehouse)

def main():
logger.debug('Simple Snowflake job starting...')

log_dataops_vars()
log_snowflake_vars

# Connect to Snowflake
try:
logger.info(
'Connecting to Snowflake account {}.snowflakecomputing.com with user {}'.format(snowflake_account, snowflake_user))
sf = snowflake.connector.connect(
account=snowflake_account,
user=snowflake_user,
password=snowflake_password,
role=snowflake_role,
warehouse=snowflake_warehouse)
except Exception as ex:
logger.fatal('Unable to connect to Snowflake: {}'.format(str(ex)))
sys.exit(1)

# Execute SQL
sql = "SELECT * FROM testtable WHERE col1 LIKE 'T%';"
try:
logger.info('Executing SQL: {}'.format(sql))
cur = sf.cursor(DictCursor)
cur.execute(sql)
records = cur.fetchall()
logger.info('Returned {} record(s) from Snowflake'.format(len(records)))

except Exception as ex:
logger.fatal('Error executing SQL: {}'.format(str(ex)))
sys.exit(1)

# using stdout
outfile = sys.stdout
# export as CSV
headers = [i[0] for i in cur.description]
writer = csv.DictWriter(outfile, fieldnames=headers)
writer.writeheader()
for rec in list(records):
writer.writerow(rec)

# Disconnect from Snowflake
sf.close()

logger.debug('Simple Snowflake job done')

if __name__ == '__main__':
main()

Adjust the sql statement and the outfile to suit your requirements. This program dumps the content of your table as CSV to stdout.

Step 3

Finally, leverage the script from your job. Create this file, pipelines/includes/local_includes/python_prj_jobs/run_snowflake.yml, as follows:

pipelines/includes/local_includes/python_prj_jobs/run_snowflake.yml
Extract from Snowflake:
extends:
- .agent_tag
stage: "Snowflake Actions"
image: $DATAOPS_PYTHON3_RUNNER_IMAGE
variables:
# credentials
SNOWFLAKE_ACCOUNT: DATAOPS_VAULT(SNOWFLAKE.SOLE.ACCOUNT)
SNOWFLAKE_USER: DATAOPS_VAULT(SNOWFLAKE.SOLE.USERNAME)
SNOWFLAKE_PASSWORD: DATAOPS_VAULT(SNOWFLAKE.SOLE.PASSWORD)
SNOWFLAKE_ROLE: DATAOPS_VAULT(SNOWFLAKE.SOLE.ROLE)
SNOWFLAKE_WAREHOUSE: DATAOPS_VAULT(SNOWFLAKE.SOLE.WAREHOUSE)

# DataOps Parameters
SPECIFIED_PATH: ${CI_PROJECT_DIR}/python_snowflake
DATAOPS_RUN_PYTHON_SCRIPT: ${CI_PROJECT_DIR}/python_snowflake/main.py

script:
- /dataops
icon: ${PYTHON_ICON}

Project resources

Store your Python scripts or project in its own folder inside your DataOps project, e.g., python_prj as shown in the example job in the Develop a Python Project section.