Skip to main content

DataPrep Orchestrator

Enterprise

Image$DATAOPS_DATAPREP_RUNNER_IMAGE

The DataPrep orchestrator allows the processing and synchronizing of files between two locations, such as two different AWS S3 buckets or a local folder and an Azure Storage Blob.

All the files in the input/source location are compared to the output/target location. Files not present at the target location are downloaded from the source, processed, and uploaded to the target.

The file processing is handled by either Python or bash scripts, with the following locations supported and interchangeable:

  1. AWS S3 Bucket
  2. Local Directory
  3. Azure Storage Blob Container

Usage

The DataPrep orchestrator workflow is as follows:

  1. Check the defined storage locations.

    • For both input and output storage locations, check for required keys
    • Configures the environment
    • Test the connections with the supplied credentials
  2. Fetch the list of files.

    • Fetch the list of files located in the input storage location
    • Fetch the list of files located in the output storage location
  3. Compare the file list by name.

  4. Start synchronization for each file present in the input but not the output.

    • Download the file from the input storage location
    • Process the file using the user-specified script
    • Upload the processed file to the output storage location

Modes

The DataPrep orchestrator supports the following modes:

1. SYNC mode

Consider the following storage location layouts:

/<input storage>
/$DATAPREP_INPUT_PREFIX

/<output storage>
/$DATAPREP_OUTPUT_PREFIX

The workflow to process these files in SYNC mode is as follows:

  1. Compare the input and output storage locations
  2. Process the files that are present at the input but not at the output location
  3. Upload the processed file to the output storage location

2. ARCHIVE mode

Consider the following storage location layouts:

/<input storage>
/$DATAPREP_INPUT_PREFIX
/$DATAPREP_ERROR_PREFIX
/$DATAPREP_ARCHIVE_PREFIX

/<output storage>
/$DATAPREP_OUTPUT_PREFIX

The workflow to process these files in ARCHIVE mode is as follows:

  1. Compare the input and output storage locations with a pagination methodology.
  2. Process files present in the input but not in the output location.
  3. Move the raw input file to the archive folder in the input storage location for the successfully processed file.
  4. Move the raw input file to the error folder in the input storage location for the failed processed file.
  5. Delete processed files from the input storage.

Lastly, here is an example of how to structure a YAML file for a DataPrep orchestrator file sync job:

pipelines/includes/local_includes/dataprep_jobs/dataprep.yml
"Prep My Data":
extends:
- .agent_tag
stage: "Clean Input Data"
image: $DATAOPS_DATAPREP_RUNNER_IMAGE
variables:
PREP_ACTION: SYNC
DATAPREP_SCRIPT_PATH: $CI_PROJECT_DIR/dataops/dataprep/
DATAPREP_SCRIPT_NAME: clean-input-file.sh
# storage-specific configuration
## input storage
DATAPREP_INPUT_STORAGE: AWS
DATAPREP_INPUT_BUCKET: s3-input-bucket
DATAPREP_INPUT_PREFIX: "your/input/"
## output storage
DATAPREP_OUTPUT_STORAGE: AWS
DATAPREP_OUTPUT_BUCKET: s3-output-bucket
DATAPREP_OUTPUT_PREFIX: "your/output/"
# file filter
DATAPREP_FILE_FORMAT: all
script:
- /dataops
icon: DATAPREP_ICON

Supported parameters

You can add the following parameters to the file sync job:

ParameterRequired/DefaultDescription
PREP_ACTIONREQUIREDMust be SYNC or ARCHIVE
DATAPREP_SCRIPT_NAMEREQUIREDThe script name to run to process the file
Storage Specific ConfigurationREQUIREDStorage specific configuration. See Storage Specific Configuration
DATAPREP_INPUT_STORAGEOptional - defaults to AWSStorage to use as input location. Must be one of AWS, Azure, or Local - case insensitive
DATAPREP_OUTPUT_STORAGEOptional - defaults to AWSStorage to use as output location. Must be one of AWS, Azure, or Local - case insensitive
DATAPREP_SCRIPT_PATHOptional - defaults to CI_PROJECT_DIRFile path where the script is saved
DATAPREP_RUN_ENVOptional - defaults to bashEnvironment in which the script will be run. Must be one of bash, python, python3, node or r - case sensitive
DATAPREP_INPUT_PREFIXOptionalPrefix/folder to set as a directory for input files
DATAPREP_OUTPUT_PREFIXOptionalPrefix/folder to set as a directory for output files
DATAPREP_ARCHIVE_PREFIXOptional - defaults to archivePrefix/folder to set as a directory for archive files in ARCHIVE mode
DATAPREP_ERROR_PREFIXOptional - defaults to errorPrefix/folder to set as a directory for error files in ARCHIVE mode
DATAPREP_FILE_FORMATOptional - defaults to csvFile format to select and filter files from both storage locations
DATAPREP_MAX_JOBSOptional - defaults to number of cores on systemNumber of files-sync/jobs to run in parallel
DATAOPS_DATAPREP_FAIL_ON_ERROROptionalSet a value to fail the script if the error folder has any file.
Note: If DATAPREP_ERROR_PREFIX is not set, the orchestrator uses a default value.
PAGE_SIZEOptional - defaults to 200Value for page size in ARCHIVE mode

The following topics describe several environment variables in detail:

DATAPREP_FILE_FORMAT

If no format is specified in the environment variable DATAPREP_FILE_FORMAT, only files ending with .csv are selected from both the input and output storage locations.

If multiple file formats are to be selected by the DataPrep orchestrator, you can specify all file formats as comma-separated values in this variable.

For example, you can match multiple file types at once. If .csv, .tsv, and .txt files must be processed, use:

pipelines/includes/local_includes/dataprep_jobs/dataprep.yml
"Prep My Data":
extends:
- .agent_tag
stage: "Clean Input Data"
image: $DATAOPS_DATAPREP_RUNNER_IMAGE
variables:
PREP_ACTION: SYNC
DATAPREP_SCRIPT_PATH: $CI_PROJECT_DIR/dataops/dataprep/
DATAPREP_SCRIPT_NAME: clean-input-file.sh
DATAPREP_INPUT_STORAGE: AWS
DATAPREP_INPUT_BUCKET: s3-input-bucket
DATAPREP_INPUT_PREFIX: "your/input/"
DATAPREP_OUTPUT_STORAGE: AWS
DATAPREP_OUTPUT_BUCKET: s3-output-bucket
DATAPREP_OUTPUT_PREFIX: "your/output/"
DATAPREP_FILE_FORMAT: "csv,tsv,txt"
script:
- /dataops
icon: DATAPREP_ICON
tip

If all files present in the input storage location are to be selected by the DataPrep orchestrator, the variable DATAPREP_FILE_FORMAT must be set to all.

tip

If the storage location has files with the extension .all, you must set the DATAPREP_FILE_FORMAT value to all, to specify as file extension and not all files.

DATAPREP_MAX_JOBS

The value set in DATAPREP_MAX_JOBS determines how many data-prep operations are runnable in parallel. The default value equals the number of CPU cores present on the system if there is no value in this environment variable.

Setting this variable as a high value increases the number of parallel jobs executed at the cost of system resource usage.

In case the files being used are very large or the user-script running requires high CPU usage or system availability is low, setting a small value to the variable DATAPREP_MAX_JOBS may be advantageous.

Files created by the DataPrep orchestrator are deleted when they are no longer needed to prevent the system from running out of disk space.

After running the user script, the downloaded/input file is deleted. Once the generated/output is uploaded, it is deleted as well.

PAGE_SIZE

In ARCHIVE mode, the orchestrator retrieves only a small number instead of fetching the list of all files in the storage location. This number of files retrieved at a time is controlled by the variable PAGE_SIZE.

The orchestrator uses the pagination functionality of the orchestrator to fetch a limited number of files defined in the variable PAGE_SIZE (defaults to 2000) which form a batch. The orchestrator can now start processing all the files available in the batch. The number of files processed at a time is controlled by the variable DATAPREP_MAX_JOBS.

Once the orchestrator processes all the files in the batch, it fetches a new set of files to initialize another batch. This process is repeated until all the files present in the storage location have been processed.

See section Improving CPU utilization in ARCHIVE mode to read more about improving performance.

DATAPREP_RUN_ENV

variables:
DATAPREP_RUN_ENV: bash

The DataPrep orchestrator supports the following environments to run the script in:

  • bash
    • Default environment is no environment is specified
    • Version: GNU bash, version 4.4.20(1)-release
  • python
    • Uses Python3
    • Version: 3.6.9
  • python3
    • Version: 3.6.9
  • r
    • Version: 4.0.4 (2021-02-15) -- "Lost Library Book"
  • node
    • Version: v8.10.0
  • perl
    • Version: perl 5, version 26, subversion 1 (v5.26.1)

DATAPREP_SCRIPT_NAME

The DataPrep orchestrator allows users to run their own scripts modifying the data as per their needs. For each file detected by data-prep to be processed, the following are passed to the script to be run in the given order:

  1. File Name - the name of the input file
  2. Input Folder - the folder in which the file exists
  3. Output Folder - the folder in which the output file(s) are to be saved

The script can then perform the necessary changes to mold the file as required and save the file(s) in the specified output folder.

For each file to be processed, the file name, the input folder path, and the output folder path are passed to the user-provided script. At the end of the execution of the script, DataPrep assumes the file(s) to present in the specified output folder.

We recommend placing custom scripts into your DataOps project folder /dataops/dataprep. During job execution on an orchestrator, refer to it with $CI_PROJECT_DIR/dataops/dataprep/your_script.

To get you started, below are sample scripts, available in the orchestrator's /scripts/ directory, that copy the file from the input folder to the output folder:

/scripts/copy.sh
# Save file name and folder paths in variables from the command line argument
FILE_NAME="$1"
INPUT_FOLDER="$2"
OUTPUT_FOLDER="$3"

INPUT_FILE_PATH="$INPUT_FOLDER/$FILE_NAME"
OUTPUT_FILE_PATH="$OUTPUT_FOLDER/$FILE_NAME"
echo "Starting copy from $INPUT_FILE_PATH to $OUTPUT_FILE_PATH"
# Copy the file from the input path to the output path
cp -v "$INPUT_FILE_PATH" "$OUTPUT_FILE_PATH"
echo "Copy Complete"

Storage-specific configuration

It is mandatory to configure storage locations when working with the DataPrep orchestrator that allows the processing and synchronizing of files between two locations, such as two different AWS S3 buckets or a local folder and an Azure Storage Blob.

AWS storage specific configuration

AWS as input storage

"Prep My Data":
variables:
DATAPREP_INPUT_BUCKET: bucket
DATAPREP_INPUT_AWS_REGION: us-west-2
DATAPREP_INPUT_AWS_ACCESS_KEY_ID: $MY_AWS_ACCESS_KEY_ID
DATAPREP_INPUT_AWS_SECRET_ACCESS_KEY: $MY_SECRET_ACCESS_KEY
ParameterRequired or Optional (and default)Description
DATAPREP_INPUT_AWS_ACCESS_KEY_IDREQUIREDAWS_ACCESS_KEY_ID to be used for connecting to the input storage
DATAPREP_INPUT_AWS_SECRET_ACCESS_KEYREQUIREDAWS_SECRET_ACCESS_KEY to be used for connecting to the input storage
DATAPREP_INPUT_BUCKETREQUIREDBucket to be used for the input storage location
DATAPREP_INPUT_AWS_REGIONOptional - defaults to us-west-2Region to be used
DATAPREP_INPUT_PROFILE_NAMEOptional - defaults to inputName with which the profile for input storage is saved

AWS as output storage

"Prep My Data":
variables:
DATAPREP_OUTPUT_BUCKET: bucket
DATAPREP_OUTPUT_AWS_REGION: us-west-2
DATAPREP_OUTPUT_AWS_ACCESS_KEY_ID: $MY_AWS_ACCESS_KEY_ID
DATAPREP_OUTPUT_AWS_SECRET_ACCESS_KEY: $MY_SECRET_ACCESS_KEY
ParameterRequired or Optional (and default)Description
DATAPREP_OUTPUT_AWS_ACCESS_KEY_IDREQUIREDAWS_ACCESS_KEY_ID to be used for connecting to the output storage
DATAPREP_OUTPUT_AWS_SECRET_ACCESS_KEYREQUIREDAWS_SECRET_ACCESS_KEY to be used for connecting to the output storage
DATAPREP_OUTPUT_BUCKETREQUIREDBucket to be used for the output storage location
DATAPREP_OUTPUT_AWS_REGIONOptional - defaults to us-west-2Region to be used for output storage
DATAPREP_OUTPUT_PROFILE_NAMEOptional - defaults to outputName with which the profile for output storage is saved

Azure storage specific configuration

Azure as input storage

"Prep My Data":
variables:
DATAPREP_INPUT_CONTAINER: sample-container
DATAPREP_INPUT_AZURE_ACCOUNT: $MY_INPUT_AZURE_ACCOUNT
DATAPREP_INPUT_AZURE_SAS_TOKEN: $MY_INPUT_AZURE_SAS_TOKEN
ParameterRequired or Optional (and default)Description
DATAPREP_INPUT_AZURE_ACCOUNTREQUIREDAzure account to be used for connecting to the input storage
DATAPREP_INPUT_AZURE_SAS_TOKENREQUIREDAzure SAS token to be used for connecting to the input storage
DATAPREP_INPUT_CONTAINERREQUIREDContainer to be used for the input storage location

Azure as output storage

"Prep My Data":
variables:
DATAPREP_OUTPUT_CONTAINER: sample-container
DATAPREP_OUTPUT_AZURE_ACCOUNT: $MY_OUTPUT_AZURE_ACCOUNT
DATAPREP_OUTPUT_AZURE_SAS_TOKEN: $MY_OUTPUT_AZURE_SAS_TOKEN
ParameterRequired or Optional (and default)Description
DATAPREP_OUTPUT_AZURE_ACCOUNTREQUIREDAzure account to be used for connecting to the output storage
DATAPREP_OUTPUT_AZURE_SAS_TOKENREQUIREDAzure SAS token to be used for connecting to the output storage
DATAPREP_OUTPUT_CONTAINERREQUIREDContainer to be used for the output storage location

Local storage specific configuration

Local storage requires that the orchestrator has access to the directories you want to process. For large files, this requires modifying the DataOps Runner setup and adding additional docker volumes. For smaller files that are being passed between jobs, you can reference the pipeline cache.

Local as input storage

"Prep My Data":
variables:
DATAPREP_INPUT_DIRECTORY: "/agent_cache/filetransfer/in"
ParameterRequired or Optional (and default)Description
DATAPREP_INPUT_DIRECTORYREQUIREDPath of the directory to be used as input storage location

Local as output storage

"Prep My Data":
variables:
DATAPREP_INPUT_DIRECTORY: "/agent_cache/filetransfer/out"
ParameterRequired or Optional (and default)Description
DATAPREP_OUTPUT_DIRECTORYREQUIREDPath of the directory to be used as output storage location

Improving CPU utilization in ARCHIVE mode

The following tips can be used to improve CPU utilization and overall performance.

Small number of cores

If the runner system has a small number of cores, then using the default or a small value for PAGE_SIZE can improve the performance. As the batch size initialized would be relatively high, there would always be a file to be processed by the orchestrator.

Once the batch has been completed, only then the next set of files are retrieved to create a new batch. This leaves only a small window during new batch initialization where all the CPU cores are not being utilized.

Large number of cores

If the runner system has a large number of cores, then using a large value in PAGE_SIZE would yield a better result. Using a large number initializes a large batch for the orchestrator. Even if the files are processed relatively fast (a few seconds), there would still be a long queue waiting to be processed.

Improving Performance

PAGE_SIZE should be set to 3-4 times the number of CPU cores. If the average processing time of a file is small, then further increasing the value would help improve performance

Example jobs

The following is an example of a DataPrep job in SYNC mode:

pipelines/includes/local_includes/dataprep_jobs/dataprep_erp_sync.yml
"Prep ERP Data":
extends:
- .agent_tag
stage: "Snowflake orch and prep"
image: $DATAOPS_DATAPREP_RUNNER_IMAGE
variables:
PREP_ACTION: SYNC
DATAPREP_INPUT_BUCKET: XXXXX
DATAPREP_INPUT_PREFIX: "input/"
DATAPREP_OUTPUT_BUCKET: XXXXX
DATAPREP_OUTPUT_PREFIX: "dataprep_aws_output/"
DATAPREP_RUN_ENV: perl
DATAPREP_SCRIPT_NAME: copy.pl
DATAPREP_SCRIPT_PATH: /scripts/
script:
- /dataops
icon: DATAPREP_ICON

The following is an example of a DataPrep job in ARCHIVE mode

pipelines/includes/local_includes/dataprep_jobs/dataprep_erp_archive.yml
"Archive ERP Data":
extends:
- .agent_tag
stage: "Batch ingestion"
image: $DATAOPS_DATAPREP_RUNNER_IMAGE
variables:
PREP_ACTION: ARCHIVE
DATAPREP_INPUT_AWS_ACCESS_KEY_ID: XXXXX
DATAPREP_OUTPUT_AWS_ACCESS_KEY_ID: XXXXX
DATAPREP_INPUT_AWS_SECRET_ACCESS_KEY: XXXXX
DATAPREP_OUTPUT_AWS_SECRET_ACCESS_KEY: XXXXX
DATAPREP_INPUT_BUCKET: XXXXX
DATAPREP_INPUT_PREFIX: "input_archive/"
DATAPREP_OUTPUT_BUCKET: XXXXX
DATAPREP_OUTPUT_PREFIX: "dataprep_aws_output/"
DATAPREP_RUN_ENV: perl
DATAPREP_SCRIPT_NAME: copy.pl
DATAPREP_SCRIPT_PATH: /scripts/
PAGE_SIZE: 20
script:
- /dataops
icon: DATAPREP_ICON