Python3 Orchestrator
Type | Flexible |
---|---|
Image | $DATAOPS_PYTHON3_RUNNER_IMAGE |
The Python3 orchestrator is a flexible orchestrator that includes Python3 as well as the following Python libraries:
- requests
- responses
- boto3
Usage
This orchestrator is used to execute a Python3 script, such as /scripts/my-script.py
, inside your DataOps project.
However, this orchestrator can execute more than just a single Python3 script. You can also run more complex Python3 projects with it.
Should you need to pass variables into the Python3 job, you can set them in the YAML config file's variables
block. For example:
"My Python Job":
extends:
- .agent_tag
stage: "My Stage"
image: $DATAOPS_PYTHON3_RUNNER_IMAGE
variables:
MY_VAR: value
SPECIFIED_PATH: ${CI_PROJECT_DIR}/python_prj
DATAOPS_RUN_PYTHON_SCRIPT: ${CI_PROJECT_DIR}/python_prj/main.py
script:
- /dataops
icon: ${PYTHON_ICON}
Supported parameters
Parameter | Required or Optional (and default) | Description |
---|---|---|
DATAOPS_RUN_PYTHON_SCRIPT | Optional | The path to the main Python3 script. If not specified, you must call the Python3 script (your-script.py ) directly instead of using /dataops as the script parameter |
FILE | Optional. Defaults to requirements.txt | The name of the file used to run pip install |
SPECIFIED_PATH | Optional. Defaults to $CI_PROJECT_DIR | The path to requirements.txt file |
Example jobs
1. Run a script requiring no variables
The following example shows how to execute a Python3 script that does not need any input variables:
"Execute Python3 Script":
extends:
- .agent_tag
stage: "Additional Configuration"
image: $DATAOPS_PYTHON3_RUNNER_IMAGE
variables:
DATAOPS_RUN_PYTHON_SCRIPT: $CI_PROJECT_DIR/scripts/my-script.py
script:
- /dataops
icon: ${PYTHON-ICON}
2. Retrieve variables from the DataOps vault
To retrieve the secrets PATH.TO.VAR1
and PATH.TO.VAR2
from the DataOps Vault, add them as variables to the YAML config file as follows:
"Extract from Vault":
extends:
- .agent_tag
stage: "Additional Configuration"
image: $DATAOPS_PYTHON3_RUNNER_IMAGE
variables:
DATAOPS_RUN_PYTHON_SCRIPT: $CI_PROJECT_DIR/scripts/my-script.py
VAR1: DATAOPS_VAULT(PATH.TO.VAR1)
VAR2: DATAOPS_VAULT(PATH.TO.VAR2)
script:
- /dataops
icon: ${PYTHON_ICON}
Access these variables using os.getenv()
in your Python3 code, as the following code snippet indicates:
os.getenv('VAR1')
os.getenv('VAR2')
3. Develop a Python3 project
There are times when you need to execute a fully-fledged Python3 project. This mini example shows you how to set one up:
At the end of this example, you will have completed the following actions:
- Have a separate, self-contained directory for your Python project in the directory
/python_prj
- Have created a pip
requirements.txt
to install dependencies - Executed your main application in the context of the
/dataops
script
Additionally, the goal is to create the following Python project layout:
/python_prj
__init__.py
requirements.txt
main.py
module1.py
Step 1
The first step is to create the two files /python_prj/__init__.py
and /python_prj/requirements.txt
. While not used actively in this example, these files are your starting points for static helpers and project dependencies.
Therefore, as an example, a starting requirements.txt
is as follows:
pyjson ~= 1.3
Step 2
The next step is to create your main application. In our use case, this application reports on commonly-used DataOps variables and sets up a mode
variable for further use in your app.
import logging
import os
from module1 import Module1
## Set up logging
log_level = logging.DEBUG if os.getenv('DATAOPS_DEBUG') else logging.INFO
logging.basicConfig(format='\033[38;5;99m%(filename)s\033[0m \033[90m%(levelname)s\033[0m %(message)s')
logger = logging.getLogger('python_prj')
logger.setLevel(log_level)
### Log commonly used DataOps variables
def log_dataops_vars():
logger.info("Key DataOps variables")
logger.info("\tDATAOPS_PREFIX=%s", os.getenv('DATAOPS_PREFIX'))
logger.info("\tDATAOPS_DATABASE=%s", os.getenv('DATAOPS_DATABASE'))
logger.info("\tDATAOPS_ENV_NAME=%s", os.getenv('DATAOPS_ENV_NAME'))
logger.info("\tDATAOPS_PERSISTENT_CACHE_DIR=%s", os.getenv('DATAOPS_PERSISTENT_CACHE_DIR'))
logger.info("\tDATAOPS_PIPELINE_CACHE_DIR=%s", os.getenv('DATAOPS_PIPELINE_CACHE_DIR'))
logger.info("\tDATAOPS_PERSISTENT_CACHE_DIR=%s", os.getenv('DATAOPS_PERSISTENT_CACHE_DIR'))
logger.info("\tDATAOPS_DEBUG=%s", os.getenv('DATAOPS_DEBUG'))
logger.info("Key pipeline variables")
logger.info("\tCI_COMMIT_BRANCH=%s", os.getenv('CI_COMMIT_BRANCH'))
logger.info("\tCI_JOB_NAME=%s", os.getenv('CI_JOB_NAME'))
logger.info("\tCI_JOB_STAGE=%s", os.getenv('CI_JOB_STAGE'))
def main():
logger.debug('main starting...')
log_dataops_vars()
mode = os.getenv('MODE') or 'default'
logger.info("Running in mode %s", mode)
m1 = Module1(logger=logger)
try:
m1.say_something("Hello!")
except Exception as ex:
logger.error(str(ex))
logger.debug('main is done')
if __name__ == '__main__':
main()
Step 3
The third step is to code your module as follows:
import logging
class Module1:
def __init__(self, logger=logging):
self.logger = logger
pass
def say_something(self, what):
self.logger.info(what)
Step 4
Finally, you can run your main application from the Python3 orchestrator job using the following YAML code:
"Run Python Project":
extends:
- .agent_tag
stage: "Pipeline Initialisation"
image: $DATAOPS_PYTHON3_RUNNER_IMAGE
variables:
# user defined variable
MODE: run
# DataOps Parameters
SPECIFIED_PATH: ${CI_PROJECT_DIR}/python_prj
DATAOPS_RUN_PYTHON_SCRIPT: ${CI_PROJECT_DIR}/python_prj/main.py
script:
- /dataops
icon: ${PYTHON_ICON}
If the job executes successfully, its job log will be similar to the following example:
main.py DEBUG main starting...
main.py INFO Key DataOps variables
main.py INFO DATAOPS_PREFIX=DATAOPS
main.py INFO DATAOPS_DATABASE=None
main.py INFO DATAOPS_ENV_NAME=None
main.py INFO DATAOPS_PERSISTENT_CACHE_DIR=/agent_cache/persistent_cache
main.py INFO DATAOPS_PIPELINE_CACHE_DIR=/agent_cache/304076
main.py INFO DATAOPS_PERSISTENT_CACHE_DIR=/agent_cache/persistent_cache
main.py INFO DATAOPS_DEBUG=1
main.py INFO Key pipeline variables
main.py INFO CI_COMMIT_BRANCH=master
main.py INFO CI_JOB_NAME=Run Python Project
main.py INFO CI_JOB_STAGE=Pipeline Initialisation
main.py INFO Running in mode run
module1.py INFO Hello!
main.py DEBUG main is done
If you execute the job with DATAOPS_DEBUG
option set, the job log will include the following debug messages:
main.py DEBUG main starting...
main.py INFO Key DataOps variables
...
module1.py INFO Hello!
main.py DEBUG main is done
4. Export from Snowflake
This example builds on the concepts of the previous Python project. It demonstrates how to connect to Snowflake leveraging DataOps standard variables.
By the end of this example, you will have completed the following actions:
- Have a separate, self-contained directory for your Python project in the directory
/python_snowflake
- Created a pip
requirements.txt
to install the Snowflake connector dependencies - Executed your main application in the context of the
/dataops
script
Step 1
The first step is to create a new file python_snowflake/requirements.txt
:
# Source https://github.com/snowflakedb/snowflake-connector-python/blob/main/tested_requirements/requirements_38.reqs
asn1crypto==1.5.1
certifi==2021.10.8
cffi==1.15.0
charset-normalizer==2.0.12
cryptography==36.0.2
idna==3.3
oscrypto==1.3.0
pycparser==2.21
pycryptodomex==3.14.1
PyJWT==2.3.0
pyOpenSSL==21.0.0
pytz==2021.3
requests==2.27.1
six==1.16.0
urllib3==1.26.9
snowflake-connector-python==2.7.6
This action will ensure that the Snowflake Python Connector compatible with Python 3.8 is available to your program.
Step 2
Secondly, create a new Python3 file python_snowflake/main.py
:
import csv
import logging
import os
import sys
import snowflake.connector
from snowflake.connector import DictCursor
## Set up logging
log_level = logging.DEBUG if os.getenv('DATAOPS_DEBUG') else logging.INFO
logging.basicConfig(format='\033[38;5;99m%(filename)s\033[0m \033[90m%(levelname)s\033[0m %(message)s')
logger = logging.getLogger('python_prj')
logger.setLevel(log_level)
## get connection information
snowflake_account = os.getenv('SNOWFLAKE_ACCOUNT')
snowflake_user = os.getenv('SNOWFLAKE_USER')
snowflake_password = os.getenv('SNOWFLAKE_PASSWORD')
snowflake_role = os.getenv('SNOWFLAKE_ROLE')
snowflake_warehouse = os.getenv('SNOWFLAKE_WAREHOUSE')
def log_dataops_vars():
""" Log key variables that you can use to customize your program's behavior"""
logger.info("Key DataOps variables")
logger.info("\tDATAOPS_PREFIX=%s", os.getenv('DATAOPS_PREFIX'))
logger.info("\tDATAOPS_DATABASE=%s", os.getenv('DATAOPS_DATABASE'))
logger.info("\tDATAOPS_ENV_NAME=%s", os.getenv('DATAOPS_ENV_NAME'))
logger.info("\tDATAOPS_PERSISTENT_CACHE_DIR=%s", os.getenv('DATAOPS_PERSISTENT_CACHE_DIR'))
logger.info("\tDATAOPS_PIPELINE_CACHE_DIR=%s", os.getenv('DATAOPS_PIPELINE_CACHE_DIR'))
logger.info("\tDATAOPS_PERSISTENT_CACHE_DIR=%s", os.getenv('DATAOPS_PERSISTENT_CACHE_DIR'))
logger.info("\tDATAOPS_DEBUG=%s", os.getenv('DATAOPS_DEBUG'))
logger.info("Key pipeline variables")
logger.info("\tCI_COMMIT_BRANCH=%s", os.getenv('CI_COMMIT_BRANCH'))
logger.info("\tCI_JOB_NAME=%s", os.getenv('CI_JOB_NAME'))
logger.info("\tCI_JOB_STAGE=%s", os.getenv('CI_JOB_STAGE'))
def log_snowflake_vars():
""" Log Snowflake connection variables. For password only report if set/unset"""
logger.info("Snowflake connection variables")
logger.info("\SNOWFLAKE_ACCOUNT=%s", snowflake_account)
logger.info("\SNOWFLAKE_USER=%s", snowflake_user)
logger.info("\SNOWFLAKE_ROLE=%s", snowflake_role)
logger.info("\SNOWFLAKE_PASSWORD=%s", 'is set' if snowflake_password is not None else 'unset' )
logger.info("\SNOWFLAKE_WAREHOUSE=%s", snowflake_warehouse)
def main():
logger.debug('Simple Snowflake job starting...')
log_dataops_vars()
log_snowflake_vars
# Connect to Snowflake
try:
logger.info(
'Connecting to Snowflake account {}.snowflakecomputing.com with user {}'.format(snowflake_account, snowflake_user))
sf = snowflake.connector.connect(
account=snowflake_account,
user=snowflake_user,
password=snowflake_password,
role=snowflake_role,
warehouse=snowflake_warehouse)
except Exception as ex:
logger.fatal('Unable to connect to Snowflake: {}'.format(str(ex)))
sys.exit(1)
# Execute SQL
sql = "SELECT * FROM testtable WHERE col1 LIKE 'T%';"
try:
logger.info('Executing SQL: {}'.format(sql))
cur = sf.cursor(DictCursor)
cur.execute(sql)
records = cur.fetchall()
logger.info('Returned {} record(s) from Snowflake'.format(len(records)))
except Exception as ex:
logger.fatal('Error executing SQL: {}'.format(str(ex)))
sys.exit(1)
# using stdout
outfile = sys.stdout
# export as CSV
headers = [i[0] for i in cur.description]
writer = csv.DictWriter(outfile, fieldnames=headers)
writer.writeheader()
for rec in list(records):
writer.writerow(rec)
# Disconnect from Snowflake
sf.close()
logger.debug('Simple Snowflake job done')
if __name__ == '__main__':
main()
Adjust the sql
statement (line 73) and the outfile
(line 86) to suit your requirements. This program dumps the content of your table as CSV to stdout.
Step 3
Finally, leverage the script from your job. Create this file, pipelines/includes/local_includes/python_prj_jobs/run_snowflake.yml
, as follows:
Extract from Snowflake:
extends:
- .agent_tag
stage: "Snowflake Actions"
image: $DATAOPS_PYTHON3_RUNNER_IMAGE
variables:
# credentials
SNOWFLAKE_ACCOUNT: DATAOPS_VAULT(SNOWFLAKE.SOLE.ACCOUNT)
SNOWFLAKE_USER: DATAOPS_VAULT(SNOWFLAKE.SOLE.USERNAME)
SNOWFLAKE_PASSWORD: DATAOPS_VAULT(SNOWFLAKE.SOLE.PASSWORD)
SNOWFLAKE_ROLE: DATAOPS_VAULT(SNOWFLAKE.SOLE.ROLE)
SNOWFLAKE_WAREHOUSE: DATAOPS_VAULT(SNOWFLAKE.SOLE.WAREHOUSE)
# DataOps Parameters
SPECIFIED_PATH: ${CI_PROJECT_DIR}/python_snowflake
DATAOPS_RUN_PYTHON_SCRIPT: ${CI_PROJECT_DIR}/python_snowflake/main.py
script:
- /dataops
icon: ${PYTHON_ICON}
Project resources
Store your Python scripts or project in its own folder inside your DataOps project, e.g., python_prj
as shown in the example job in the Develop a Python3 Project section.
Host dependencies (and Resources)
None