Python Orchestrators
Professional Enterprise
Within your DataOps projects, you can use different Python versions for tasks ranging from running a single Python script to running fully developed applications that have third-party dependency. To select your Python version, you only need to pass the corresponding image variable into the Python job in your project.
- Python 3.12
- Python 3.11
Python 3.12 orchestrator
Image | $DATAOPS_PYTHON3_RUNNER_IMAGE |
---|
The Python 3.12 orchestrator runs the commands and scripts using Python version 3.12 and comes pre-packaged with the following major Python libraries:
- boto3
- click
- Jinja2
- PyYAML
- requests
- responses
- snowflake-connector-python
Python 3.11 orchestrator
Image | $DATAOPS_PYTHON3_11_RUNNER_IMAGE |
---|
The Python 3.11 orchestrator runs the commands and scripts using Python version 3.11 and comes pre-packaged with the following major Python libraries:
- boto3
- click
- Jinja2
- PyYAML
- requests
- responses
Usage
Both orchestrators are used to execute a Python script, such as /scripts/my-script.py
, inside your DataOps project.
However, they can execute more than just a single Python script. You can also run more complex Python projects with them.
If you need to pass variables into the Python job, you can set them in the YAML config file's variables
block. For example:
- Python 3.12
- Python 3.11
"My Python Job":
extends:
- .agent_tag
stage: "My Stage"
image: $DATAOPS_PYTHON3_RUNNER_IMAGE
variables:
MY_VAR: value
SPECIFIED_PATH: ${CI_PROJECT_DIR}/python_prj
DATAOPS_RUN_PYTHON_SCRIPT: ${CI_PROJECT_DIR}/python_prj/main.py
script:
- /dataops
icon: ${PYTHON_ICON}
"My Python Job":
extends:
- .agent_tag
stage: "My Stage"
image: $DATAOPS_PYTHON3_11_RUNNER_IMAGE
variables:
MY_VAR: value
SPECIFIED_PATH: ${CI_PROJECT_DIR}/python_prj
DATAOPS_RUN_PYTHON_SCRIPT: ${CI_PROJECT_DIR}/python_prj/main.py
script:
- /dataops
icon: ${PYTHON_ICON}
Supported parameters
Parameter | Required/Optional/Default | Description |
---|---|---|
DATAOPS_RUN_PYTHON_SCRIPT | Optional | Path to the main Python script. If not specified, you must call the Python script (your-script.py ) directly instead of using /dataops as the script parameter |
FILE | Optional. Defaults to requirements.txt | Name of the file used to run pip install |
SPECIFIED_PATH | Optional. Defaults to $CI_PROJECT_DIR | Path to requirements.txt file |
Example jobs
1. Run a script requiring no variables
The following example shows how to execute a Python script that does not need any input variables:
- Python 3.12
- Python 3.11
"Execute Python3 Script":
extends:
- .agent_tag
stage: "Additional Configuration"
image: $DATAOPS_PYTHON3_RUNNER_IMAGE
variables:
DATAOPS_RUN_PYTHON_SCRIPT: $CI_PROJECT_DIR/scripts/my-script.py
script:
- /dataops
icon: ${PYTHON-ICON}
"Execute Python3 Script":
extends:
- .agent_tag
stage: "Additional Configuration"
image: $DATAOPS_PYTHON3_11_RUNNER_IMAGE
variables:
DATAOPS_RUN_PYTHON_SCRIPT: $CI_PROJECT_DIR/scripts/my-script.py
script:
- /dataops
icon: ${PYTHON-ICON}
2. Retrieve variables from the DataOps vault
To retrieve the secrets PATH.TO.VAR1
and PATH.TO.VAR2
from the DataOps vault, add them as variables to the YAML config file as follows:
- Python 3.12
- Python 3.11
"Extract from Vault":
extends:
- .agent_tag
stage: "Additional Configuration"
image: $DATAOPS_PYTHON3_RUNNER_IMAGE
variables:
DATAOPS_RUN_PYTHON_SCRIPT: $CI_PROJECT_DIR/scripts/my-script.py
VAR1: DATAOPS_VAULT(PATH.TO.VAR1)
VAR2: DATAOPS_VAULT(PATH.TO.VAR2)
script:
- /dataops
icon: ${PYTHON_ICON}
"Extract from Vault":
extends:
- .agent_tag
stage: "Additional Configuration"
image: $DATAOPS_PYTHON3_11_RUNNER_IMAGE
variables:
DATAOPS_RUN_PYTHON_SCRIPT: $CI_PROJECT_DIR/scripts/my-script.py
VAR1: DATAOPS_VAULT(PATH.TO.VAR1)
VAR2: DATAOPS_VAULT(PATH.TO.VAR2)
script:
- /dataops
icon: ${PYTHON_ICON}
Access these variables using os.getenv()
in your Python code, as the following code snippet indicates:
os.getenv('VAR1')
os.getenv('VAR2')
3. Install additional Python packages
You can install Python packages that are not pre-installed in the Python 3.12 or Python 3.11 orchestrator images by adding a requirements.txt
file with the list of required packages to your project.
The orchestrator searches for the file requirements.txt
by default in the root of the project directory, but you can specify a different path using the SPECIFIED_PATH
variable. You can also override the file name using the FILE
variable.
Following is an example of how you can install additional packages:
# Install the latest version of the package
numpy
# Install a specific version of the package
scipy==1.0.0
# Install the latest available package within the specified version range
pandas>=1.0.0,<2.0.0
- Python 3.12
- Python 3.11
"Execute Python3 Script":
extends:
- .agent_tag
stage: "Additional Configuration"
image: $DATAOPS_PYTHON3_RUNNER_IMAGE
variables:
DATAOPS_RUN_PYTHON_SCRIPT: $CI_PROJECT_DIR/scripts/my_script.py
script:
- /dataops
icon: ${PYTHON-ICON}
"Execute Python3 Script":
extends:
- .agent_tag
stage: "Additional Configuration"
image: $DATAOPS_PYTHON3_11_RUNNER_IMAGE
variables:
DATAOPS_RUN_PYTHON_SCRIPT: $CI_PROJECT_DIR/scripts/my_script.py
script:
- /dataops
icon: ${PYTHON-ICON}
4. Develop a Python project
There are times when you need to execute a fully-fledged Python project. This mini example shows you how to set one up.
At the end of this example, you will have completed the following actions:
- Have a separate, self-contained directory for your Python project in the directory
/python_prj
- Have created a pip
requirements.txt
to install dependencies - Have executed your main application in the context of the
/dataops
script
Additionally, the goal is to create the following Python project layout:
/python_prj
__init__.py
requirements.txt
main.py
module1.py
Step 1
Create the two files /python_prj/__init__.py
and /python_prj/requirements.txt
.
While not used actively in this example, these files are your starting points for static helpers and project dependencies.
Therefore, as an example, a starting requirements.txt
is as follows:
pyjson ~= 1.3
Step 2
Create your main application. In our use case, this application reports on commonly used DataOps variables and sets up a mode
variable for further use in your app.
import logging
import os
from module1 import Module1
## Set up logging
log_level = logging.DEBUG if os.getenv('DATAOPS_DEBUG') else logging.INFO
logging.basicConfig(format='\033[38;5;99m%(filename)s\033[0m \033[90m%(levelname)s\033[0m %(message)s')
logger = logging.getLogger('python_prj')
logger.setLevel(log_level)
### Log commonly used DataOps variables
def log_dataops_vars():
logger.info("Key DataOps variables")
logger.info("\tDATAOPS_PREFIX=%s", os.getenv('DATAOPS_PREFIX'))
logger.info("\tDATAOPS_DATABASE=%s", os.getenv('DATAOPS_DATABASE'))
logger.info("\tDATAOPS_ENV_NAME=%s", os.getenv('DATAOPS_ENV_NAME'))
logger.info("\tDATAOPS_PERSISTENT_CACHE_DIR=%s", os.getenv('DATAOPS_PERSISTENT_CACHE_DIR'))
logger.info("\tDATAOPS_PIPELINE_CACHE_DIR=%s", os.getenv('DATAOPS_PIPELINE_CACHE_DIR'))
logger.info("\tDATAOPS_PERSISTENT_CACHE_DIR=%s", os.getenv('DATAOPS_PERSISTENT_CACHE_DIR'))
logger.info("\tDATAOPS_DEBUG=%s", os.getenv('DATAOPS_DEBUG'))
logger.info("Key pipeline variables")
logger.info("\tCI_COMMIT_BRANCH=%s", os.getenv('CI_COMMIT_BRANCH'))
logger.info("\tCI_JOB_NAME=%s", os.getenv('CI_JOB_NAME'))
logger.info("\tCI_JOB_STAGE=%s", os.getenv('CI_JOB_STAGE'))
def main():
logger.debug('main starting...')
log_dataops_vars()
mode = os.getenv('MODE') or 'default'
logger.info("Running in mode %s", mode)
m1 = Module1(logger=logger)
try:
m1.say_something("Hello!")
except Exception as ex:
logger.error(str(ex))
logger.debug('main is done')
if __name__ == '__main__':
main()
Step 3
Code your module as follows:
import logging
class Module1:
def __init__(self, logger=logging):
self.logger = logger
pass
def say_something(self, what):
self.logger.info(what)
Step 4
Finally, run your main application from the Python 3.12 or Python 3.11 orchestrator job using the following YAML code:
- Python 3.12
- Python 3.11
"Run Python Project":
extends:
- .agent_tag
stage: "Pipeline Initialisation"
image: $DATAOPS_PYTHON3_RUNNER_IMAGE
variables:
# user defined variable
MODE: run
# DataOps Parameters
SPECIFIED_PATH: ${CI_PROJECT_DIR}/python_prj
DATAOPS_RUN_PYTHON_SCRIPT: ${CI_PROJECT_DIR}/python_prj/main.py
script:
- /dataops
icon: ${PYTHON_ICON}
"Run Python Project":
extends:
- .agent_tag
stage: "Pipeline Initialisation"
image: $DATAOPS_PYTHON3_11_RUNNER_IMAGE
variables:
# user defined variable
MODE: run
# DataOps Parameters
SPECIFIED_PATH: ${CI_PROJECT_DIR}/python_prj
DATAOPS_RUN_PYTHON_SCRIPT: ${CI_PROJECT_DIR}/python_prj/main.py
script:
- /dataops
icon: ${PYTHON_ICON}
If the job executes successfully, the job log will be similar to the following example:
main.py DEBUG main starting...
main.py INFO Key DataOps variables
main.py INFO DATAOPS_PREFIX=DATAOPS
main.py INFO DATAOPS_DATABASE=None
main.py INFO DATAOPS_ENV_NAME=None
main.py INFO DATAOPS_PERSISTENT_CACHE_DIR=/agent_cache/persistent_cache
main.py INFO DATAOPS_PIPELINE_CACHE_DIR=/agent_cache/304076
main.py INFO DATAOPS_PERSISTENT_CACHE_DIR=/agent_cache/persistent_cache
main.py INFO DATAOPS_DEBUG=1
main.py INFO Key pipeline variables
main.py INFO CI_COMMIT_BRANCH=main
main.py INFO CI_JOB_NAME=Run Python Project
main.py INFO CI_JOB_STAGE=Pipeline Initialisation
main.py INFO Running in mode run
module1.py INFO Hello!
main.py DEBUG main is done
If you execute the job with DATAOPS_DEBUG
option set, the job log will include the following debug messages:
main.py DEBUG main starting...
main.py INFO Key DataOps variables
...
module1.py INFO Hello!
main.py DEBUG main is done
5. Export from Snowflake
This example is only compatible with Python 3.12.
This example builds on the concepts of the previous Python project. It demonstrates how to connect to Snowflake leveraging DataOps.live standard variables.
By the end of this example, you will have completed the following actions:
- Have a separate, self-contained directory for your Python project in the directory
/python_snowflake
- Have created a pip
requirements.txt
to install the Snowflake connector dependencies - Have executed your main application in the context of the
/dataops
script
Step 1
Create a new file python_snowflake/requirements.txt
:
# Source https://github.com/snowflakedb/snowflake-connector-python/blob/main/tested_requirements/requirements_38.reqs
asn1crypto==1.5.1
certifi==2021.10.8
cffi==1.15.0
charset-normalizer==2.0.12
cryptography==36.0.2
idna==3.3
oscrypto==1.3.0
pycparser==2.21
pycryptodomex==3.14.1
PyJWT==2.3.0
pyOpenSSL==21.0.0
pytz==2021.3
requests==2.27.1
six==1.16.0
urllib3==1.26.9
snowflake-connector-python==2.7.6
This action will ensure that the Snowflake Python Connector, compatible with Python 3.12 is available to your program.
Step 2
Create a new Python file python_snowflake/main.py
:
import csv
import logging
import os
import sys
import snowflake.connector
from snowflake.connector import DictCursor
## Set up logging
log_level = logging.DEBUG if os.getenv('DATAOPS_DEBUG') else logging.INFO
logging.basicConfig(format='\033[38;5;99m%(filename)s\033[0m \033[90m%(levelname)s\033[0m %(message)s')
logger = logging.getLogger('python_prj')
logger.setLevel(log_level)
## get connection information
snowflake_account = os.getenv('SNOWFLAKE_ACCOUNT')
snowflake_user = os.getenv('SNOWFLAKE_USER')
snowflake_password = os.getenv('SNOWFLAKE_PASSWORD')
snowflake_role = os.getenv('SNOWFLAKE_ROLE')
snowflake_warehouse = os.getenv('SNOWFLAKE_WAREHOUSE')
def log_dataops_vars():
""" Log key variables that you can use to customize your program's behavior"""
logger.info("Key DataOps variables")
logger.info("\tDATAOPS_PREFIX=%s", os.getenv('DATAOPS_PREFIX'))
logger.info("\tDATAOPS_DATABASE=%s", os.getenv('DATAOPS_DATABASE'))
logger.info("\tDATAOPS_ENV_NAME=%s", os.getenv('DATAOPS_ENV_NAME'))
logger.info("\tDATAOPS_PERSISTENT_CACHE_DIR=%s", os.getenv('DATAOPS_PERSISTENT_CACHE_DIR'))
logger.info("\tDATAOPS_PIPELINE_CACHE_DIR=%s", os.getenv('DATAOPS_PIPELINE_CACHE_DIR'))
logger.info("\tDATAOPS_PERSISTENT_CACHE_DIR=%s", os.getenv('DATAOPS_PERSISTENT_CACHE_DIR'))
logger.info("\tDATAOPS_DEBUG=%s", os.getenv('DATAOPS_DEBUG'))
logger.info("Key pipeline variables")
logger.info("\tCI_COMMIT_BRANCH=%s", os.getenv('CI_COMMIT_BRANCH'))
logger.info("\tCI_JOB_NAME=%s", os.getenv('CI_JOB_NAME'))
logger.info("\tCI_JOB_STAGE=%s", os.getenv('CI_JOB_STAGE'))
def log_snowflake_vars():
""" Log Snowflake connection variables. For password only report if set/unset"""
logger.info("Snowflake connection variables")
logger.info("\SNOWFLAKE_ACCOUNT=%s", snowflake_account)
logger.info("\SNOWFLAKE_USER=%s", snowflake_user)
logger.info("\SNOWFLAKE_ROLE=%s", snowflake_role)
logger.info("\SNOWFLAKE_PASSWORD=%s", 'is set' if snowflake_password is not None else 'unset' )
logger.info("\SNOWFLAKE_WAREHOUSE=%s", snowflake_warehouse)
def main():
logger.debug('Simple Snowflake job starting...')
log_dataops_vars()
log_snowflake_vars
# Connect to Snowflake
try:
logger.info(
'Connecting to Snowflake account {}.snowflakecomputing.com with user {}'.format(snowflake_account, snowflake_user))
sf = snowflake.connector.connect(
account=snowflake_account,
user=snowflake_user,
password=snowflake_password,
role=snowflake_role,
warehouse=snowflake_warehouse)
except Exception as ex:
logger.fatal('Unable to connect to Snowflake: {}'.format(str(ex)))
sys.exit(1)
# Execute SQL
sql = "SELECT * FROM testtable WHERE col1 LIKE 'T%';"
try:
logger.info('Executing SQL: {}'.format(sql))
cur = sf.cursor(DictCursor)
cur.execute(sql)
records = cur.fetchall()
logger.info('Returned {} record(s) from Snowflake'.format(len(records)))
except Exception as ex:
logger.fatal('Error executing SQL: {}'.format(str(ex)))
sys.exit(1)
# using stdout
outfile = sys.stdout
# export as CSV
headers = [i[0] for i in cur.description]
writer = csv.DictWriter(outfile, fieldnames=headers)
writer.writeheader()
for rec in list(records):
writer.writerow(rec)
# Disconnect from Snowflake
sf.close()
logger.debug('Simple Snowflake job done')
if __name__ == '__main__':
main()
Adjust the sql
statement and the outfile
to suit your requirements. This program dumps the content of your table as CSV to stdout.
Step 3
Finally, leverage the script from your job. Create this file, pipelines/includes/local_includes/python_prj_jobs/run_snowflake.yml
, as follows:
Extract from Snowflake:
extends:
- .agent_tag
stage: "Snowflake Actions"
image: $DATAOPS_PYTHON3_RUNNER_IMAGE
variables:
# credentials
SNOWFLAKE_ACCOUNT: DATAOPS_VAULT(SNOWFLAKE.SOLE.ACCOUNT)
SNOWFLAKE_USER: DATAOPS_VAULT(SNOWFLAKE.SOLE.USERNAME)
SNOWFLAKE_PASSWORD: DATAOPS_VAULT(SNOWFLAKE.SOLE.PASSWORD)
SNOWFLAKE_ROLE: DATAOPS_VAULT(SNOWFLAKE.SOLE.ROLE)
SNOWFLAKE_WAREHOUSE: DATAOPS_VAULT(SNOWFLAKE.SOLE.WAREHOUSE)
# DataOps Parameters
SPECIFIED_PATH: ${CI_PROJECT_DIR}/python_snowflake
DATAOPS_RUN_PYTHON_SCRIPT: ${CI_PROJECT_DIR}/python_snowflake/main.py
script:
- /dataops
icon: ${PYTHON_ICON}
Project resources
Store your Python scripts or project in its own folder inside your DataOps project, e.g., python_prj
as shown in the example job in the Develop a Python Project section.