Skip to main content

DataPrep Orchestrator

TypePre-Set
Image$DATAOPS_DATAPREP_RUNNER_IMAGE

The DataOps DataPrep Orchestrator is a pre-set orchestrator that allows processing and synchronizing of files between two locations, such as two different AWS S3 buckets or a local folder and an Azure Storage Blob.

All the files present in the input/source location are compared to the output/target location. Files that are not present on the target location are downloaded from the source, processed, and uploaded to the target.

The file processing is handled by either Python or bash scripts, with the following locations supported and interchangeable:

  1. AWS S3 Bucket
  2. Local Directory
  3. Azure Storage Blob Container

Usage

The DataPrep Orchestrator workflow is as follows:

  1. Check the defined storage locations
    • For both input and output storage locations, check for required keys
    • Configure the environment
    • Test the connections with the supplied credentials
  2. Fetch the list of files
    • Fetch the list of files located in the input storage location
    • Fetch the list of files located in the output storage location
  3. Compare the file list by name
  4. Start synchronization for each file present in the input but not the output.
    • Download the file from the input storage location
    • Process the file using the user-specified script
    • Upload the processed file to the output storage location

Modes

The DataPrep Orchestrator supports the following modes:

1. SYNC Mode

Consider the following storage location layouts:

/<input storage>
/$DATAPREP_INPUT_PREFIX

/<output storage>
/$DATAPREP_OUTPUT_PREFIX

The workflow to process these files in SYNC Mode is as follows:

  1. Compare the input and output storage locations
  2. Process the files that are present in the input but not the output location
  3. Upload the processed file to the output storage location

2. ARCHIVE Mode

Consider the following storage location layouts:

/<input storage>
/$DATAPREP_INPUT_PREFIX
/$DATAPREP_ERROR_PREFIX
/$DATAPREP_ARCHIVE_PREFIX

/<output storage>
/$DATAPREP_OUTPUT_PREFIX

The workflow to process these files in ARCHIVE Mode is as follows:

  1. Compare the input and output storage locations with a pagination methodology
  2. Process files that are present in input but not in the output location
  3. For the successfully processed file
    • Move the raw input file to the archive folder in the input storage location
  4. For the failed processed file
    • Move the raw input file to the error folder in the input storage location
  5. Delete processed files from the input storage

Lastly, here is an example of how to structure a YAML file for a DataPrep Orchestrator file sync job:

pipelines/includes/local_includes/dataprep_jobs/dataprep.yml
"Prep My Data":
extends:
- .agent_tag
stage: "Clean Input Data"
image: $DATAOPS_DATAPREP_RUNNER_IMAGE
variables:
PREP_ACTION: SYNC
DATAPREP_SCRIPT_PATH: $CI_PROJECT_DIR/dataops/dataprep/
DATAPREP_SCRIPT_NAME: clean-input-file.sh
# storage specific configuration
## input storage
DATAPREP_INPUT_STORAGE: AWS
DATAPREP_INPUT_BUCKET: s3-input-bucket
DATAPREP_INPUT_PREFIX: "your/input/"
## output storage
DATAPREP_OUTPUT_STORAGE: AWS
DATAPREP_OUTPUT_BUCKET: s3-output-bucket
DATAPREP_OUTPUT_PREFIX: "your/output/"
# file filter
DATAPREP_FILE_FORMAT: all
script:
- /dataops
icon: DATAPREP_ICON

Supported Parameters

The following parameters can be added to the file sync job:

ParameterRequired/DefaultDescription
PREP_ACTIONREQUIREDMust be SYNC or ARCHIVE
DATAPREP_SCRIPT_NAMEREQUIREDThe script name to be run to process the file
Storage Specific ConfigurationREQUIREDStorage specific configuration. See Storage Specific Configuration
DATAPREP_INPUT_STORAGEOptional - defaults to AWSStorage to be used as input location. Must be one of AWS, Azure, or Local - case insensitive
DATAPREP_OUTPUT_STORAGEOptional - defaults to AWSStorage to be used as output location. Must be one of AWS, Azure, or Local - case insensitive
DATAPREP_SCRIPT_PATHOptional - defaults to CI_PROJECT_DIRFile path where the script is saved
DATAPREP_RUN_ENVOptional - defaults to bashEnvironment in which the script will be run. Must be one of bash, python, python3, python2, node or r - case sensitive
DATAPREP_INPUT_PREFIXOptionalPrefix/folder to be set as directory for input files
DATAPREP_OUTPUT_PREFIXOptionalPrefix/folder to be set as directory for output files
DATAPREP_ARCHIVE_PREFIXOptional - defaults to archivePrefix/folder to be set as directory for archive files in ARCHIVE mode
DATAPREP_ERROR_PREFIXOptional - defaults to errorPrefix/folder to be set as directory for error files in ARCHIVE mode
DATAPREP_FILE_FORMATOptional - defaults to csvFile format to select and filter files from both storage locations
DATAPREP_MAX_JOBSOptional - defaults to number of cores on systemNumber of files-sync/jobs to run in parallel
PAGE_SIZEOptional - defaults to 200Value for Page Size in ARCHIVE mode

The following points describe several environment variables in detail:

DATAPREP_FILE_FORMAT

If no format is specified in the environment variable DATAPREP_FILE_FORMAT, only files ending with .csv are selected from both the input and output storage locations.

If multiple file formats are to be selected by the dataops-dataprep-orchestrator, you can specify all file formats (as comma-separated values) in this variable.

Example: You can match multiple file types at once. If .csv, .tsv and .txt files must be processed use:

pipelines/includes/local_includes/dataprep_jobs/dataprep.yml
"Prep My Data":
extends:
- .agent_tag
stage: "Clean Input Data"
image: $DATAOPS_DATAPREP_RUNNER_IMAGE
variables:
PREP_ACTION: SYNC
DATAPREP_SCRIPT_PATH: $CI_PROJECT_DIR/dataops/dataprep/
DATAPREP_SCRIPT_NAME: clean-input-file.sh
DATAPREP_INPUT_STORAGE: AWS
DATAPREP_INPUT_BUCKET: s3-input-bucket
DATAPREP_INPUT_PREFIX: "your/input/"
DATAPREP_OUTPUT_STORAGE: AWS
DATAPREP_OUTPUT_BUCKET: s3-output-bucket
DATAPREP_OUTPUT_PREFIX: "your/output/"
DATAPREP_FILE_FORMAT: "csv,tsv,txt"
script:
- /dataops
icon: DATAPREP_ICON

::: tip If all files present in the input storage location are to be selected by dataops-dataprep-orchestrator, the value of the variable DATAPREP_FILE_FORMAT must be set to all. :::

tip

If the storage location has files with extension .all, the value of the DATAPREP_FILE_FORMAT should be set to all, to specify as file extension and not all files.

DATAPREP_MAX_JOBS

The value set in DATAPREP_MAX_JOBS determines how many data-prep operations are runnable in parallel. The default value equals the number of CPU cores present on the system if there is no value in this environment variable.

Setting this variable as a high value increases the number of parallel jobs executed at the cost of system resource usage.

In case the files being used are very large or the user-script running requires high CPU usage or system availability is low, setting a small value to the variable DATAPREP_MAX_JOBS may be advantageous.

Files created by the DataPrep Orchestrator are deleted when they are no longer needed to prevent the system from running out of disk space.

After running the user script, the downloaded/input file is deleted. Once the generated/output is uploaded, it is deleted as well.

AWS Storage Specific Configuration

AWS as Input Storage

"Prep My Data":
variables:
DATAPREP_INPUT_BUCKET: bucket
DATAPREP_INPUT_AWS_REGION: us-west-2
DATAPREP_INPUT_AWS_ACCESS_KEY_ID: $MY_AWS_ACCESS_KEY_ID
DATAPREP_INPUT_AWS_SECRET_ACCESS_KEY: $MY_SECRET_ACCESS_KEY
ParameterRequired or Optional (and default)Description
DATAPREP_INPUT_AWS_ACCESS_KEY_IDREQUIREDAWS_ACCESS_KEY_ID to be used for connecting to the input storage
DATAPREP_INPUT_AWS_SECRET_ACCESS_KEYREQUIREDAWS_SECRET_ACCESS_KEY to be used for connecting to the input storage
DATAPREP_INPUT_BUCKETREQUIREDBucket to be used for the input storage location
DATAPREP_INPUT_AWS_REGIONOptional - defaults to us-west-2Region to be used
DATAPREP_INPUT_PROFILE_NAMEOptional - defaults to inputName with which the profile for input storage is saved

AWS as Output Storage

"Prep My Data":
variables:
DATAPREP_OUTPUT_BUCKET: bucket
DATAPREP_OUTPUT_AWS_REGION: us-west-2
DATAPREP_OUTPUT_AWS_ACCESS_KEY_ID: $MY_AWS_ACCESS_KEY_ID
DATAPREP_OUTPUT_AWS_SECRET_ACCESS_KEY: $MY_SECRET_ACCESS_KEY
ParameterRequired or Optional (and default)Description
DATAPREP_OUTPUT_AWS_ACCESS_KEY_IDREQUIREDAWS_ACCESS_KEY_ID to be used for connecting to the output storage
DATAPREP_OUTPUT_AWS_SECRET_ACCESS_KEYREQUIREDAWS_SECRET_ACCESS_KEY to be used for connecting to the output storage
DATAPREP_OUTPUT_BUCKETREQUIREDBucket to be used for the output storage location
DATAPREP_OUTPUT_AWS_REGIONOptional - defaults to us-west-2Region to be used for output storage
DATAPREP_OUTPUT_PROFILE_NAMEOptional - defaults to outputName with which the profile for output storage is saved

Azure Storage Specific Configuration

Azure as Input Storage

"Prep My Data":
variables:
DATAPREP_INPUT_CONTAINER: sample-container
DATAPREP_INPUT_AZURE_ACCOUNT: $MY_INPUT_AZURE_ACCOUNT
DATAPREP_INPUT_AZURE_SAS_TOKEN: $MY_INPUT_AZURE_SAS_TOKEN
ParameterRequired or Optional (and default)Description
DATAPREP_INPUT_AZURE_ACCOUNTREQUIREDAzure account to be used for connecting to the input storage
DATAPREP_INPUT_AZURE_SAS_TOKENREQUIREDAzure SAS token to be used for connecting to the input storage
DATAPREP_INPUT_CONTAINERREQUIREDContainer to be used for the input storage location

Azure as Output Storage

"Prep My Data":
variables:
DATAPREP_OUTPUT_CONTAINER: sample-container
DATAPREP_OUTPUT_AZURE_ACCOUNT: $MY_OUTPUT_AZURE_ACCOUNT
DATAPREP_OUTPUT_AZURE_SAS_TOKEN: $MY_OUTPUT_AZURE_SAS_TOKEN
ParameterRequired or Optional (and default)Description
DATAPREP_OUTPUT_AZURE_ACCOUNTREQUIREDAzure account to be used for connecting to the output storage
DATAPREP_OUTPUT_AZURE_SAS_TOKENREQUIREDAzure SAS token to be used for connecting to the output storage
DATAPREP_OUTPUT_CONTAINERREQUIREDContainer to be used for the output storage location

Local Storage Specific Configuration

Local storage requires that the orchestrator has access to the directories you want to process. For large files this will require to modify the DataOps Runner setup and add additional docker volumes. For smaller files that are being passed between Jobs you can reference the pipeline cache.

Local as Input Storage

"Prep My Data":
variables:
DATAPREP_INPUT_DIRECTORY: "/agent_cache/filetransfer/in"
ParameterRequired or Optional (and default)Description
DATAPREP_INPUT_DIRECTORYREQUIREDPath of the directory to be used as input storage location

Local as Output Storage

"Prep My Data":
variables:
DATAPREP_INPUT_DIRECTORY: "/agent_cache/filetransfer/out"
ParameterRequired or Optional (and default)Description
DATAPREP_OUTPUT_DIRECTORYREQUIREDPath of the directory to be used as output storage location

DATAPREP_RUN_ENV

  variables:
DATAPREP_RUN_ENV: bash

The DataPrep Orchestrator supports the following environments to run the script in:

  • bash
    • Default environment is no environment is specified
    • Version: GNU bash, version 4.4.20(1)-release
  • python
    • Uses Python3
    • Version: 3.6.9
  • python3
    • Version: 3.6.9
  • python2
    • Version: 2.7.17
  • r
    • Version: 4.0.4 (2021-02-15) -- "Lost Library Book"
  • node
    • Version: v8.10.0
  • perl
    • Version: perl 5, version 26, subversion 1 (v5.26.1)

DATAPREP_SCRIPT_NAME

The DataPrep Orchestrator allows users to run their own scripts modifying the data as per their need. For each file detected by data-prep to be processed, the following are passed to the script to be run in given order:

  1. File Name - name of the input file
  2. Input Folder - folder in which the file exists
  3. Output Folder - folder in which the output file(s) are to be saved

The script can then perform the necessary changes to mold the file as per the requirement and save file(s) in output folder specified.

For each file to be processed, the file name, the input folder path and output folder path are passed to the user provided script. At the end of execution of the script, DataPrep assumes the file(s) to present in the specified output folder.

We recommend to place your custom scripts into your DataOps project, folder /dataops/dataprep. During job execution on an Orchestrator refer to it with $CI_PROJECT_DIR/dataops/dataprep/your_script.

To get you started, below are sample scripts, available in the orchestrator's /scripts/ directory, that copies the file from the input folder to the output folder:

/scripts/copy.sh
# Save file name and folder paths in variables from command line argument
FILE_NAME="$1"
INPUT_FOLDER="$2"
OUTPUT_FOLDER="$3"

INPUT_FILE_PATH="$INPUT_FOLDER/$FILE_NAME"
OUTPUT_FILE_PATH="$OUTPUT_FOLDER/$FILE_NAME"
echo "Starting copy from $INPUT_FILE_PATH to $OUTPUT_FILE_PATH"
# Copy file from input path to output path
cp -v "$INPUT_FILE_PATH" "$OUTPUT_FILE_PATH"
echo "Copy Complete"

Example Jobs

The following is an example of a DataPrep job in SYNC mode:

pipelines/includes/local_includes/dataprep_jobs/dataprep_erp_sync.yml
"Prep ERP Data":
extends:
- .agent_tag
stage: "Snowflake orch and prep"
image: $DATAOPS_DATAPREP_RUNNER_IMAGE
variables:
PREP_ACTION: SYNC
DATAPREP_INPUT_BUCKET: XXXXX
DATAPREP_INPUT_PREFIX: "input/"
DATAPREP_OUTPUT_BUCKET: XXXXX
DATAPREP_OUTPUT_PREFIX: "dataprep_aws_output/"
DATAPREP_RUN_ENV: perl
DATAPREP_SCRIPT_NAME: copy.pl
DATAPREP_SCRIPT_PATH: /scripts/
script:
- /dataops
icon: DATAPREP_ICON

The following is an example of a DataPrep job in ARCHIVE mode

pipelines/includes/local_includes/dataprep_jobs/dataprep_erp_archive.yml
"Archive ERP Data":
extends:
- .agent_tag
stage: "Batch ingestion"
image: $DATAOPS_DATAPREP_RUNNER_IMAGE
variables:
PREP_ACTION: ARCHIVE
DATAPREP_INPUT_AWS_ACCESS_KEY_ID: XXXXX
DATAPREP_OUTPUT_AWS_ACCESS_KEY_ID: XXXXX
DATAPREP_INPUT_AWS_SECRET_ACCESS_KEY: XXXXX
DATAPREP_OUTPUT_AWS_SECRET_ACCESS_KEY: XXXXX
DATAPREP_INPUT_BUCKET: XXXXX
DATAPREP_INPUT_PREFIX: "input_archive/"
DATAPREP_OUTPUT_BUCKET: XXXXX
DATAPREP_OUTPUT_PREFIX: "dataprep_aws_output/"
DATAPREP_RUN_ENV: perl
DATAPREP_SCRIPT_NAME: copy.pl
DATAPREP_SCRIPT_PATH: /scripts/
PAGE_SIZE: 20
script:
- /dataops
icon: DATAPREP_ICON

Project Resources

None

Host Dependencies (and Resources)

None