DataPrep Orchestrator
Enterprise
Image | $DATAOPS_DATAPREP_RUNNER_IMAGE |
---|
The DataPrep orchestrator allows the processing and synchronizing of files between two locations, such as two different AWS S3 buckets or a local folder and an Azure Storage Blob.
All the files in the input/source location are compared to the output/target location. Files not present at the target location are downloaded from the source, processed, and uploaded to the target.
The file processing is handled by either Python or bash scripts, with the following locations supported and interchangeable:
- AWS S3 Bucket
- Local Directory
- Azure Storage Blob Container
Usage
The DataPrep orchestrator workflow is as follows:
-
Check the defined storage locations.
- For both input and output storage locations, check for required keys
- Configures the environment
- Test the connections with the supplied credentials
-
Fetch the list of files.
- Fetch the list of files located in the input storage location
- Fetch the list of files located in the output storage location
-
Compare the file list by name.
-
Start synchronization for each file present in the input but not the output.
- Download the file from the input storage location
- Process the file using the user-specified script
- Upload the processed file to the output storage location
Modes
The DataPrep orchestrator supports the following modes:
1. SYNC
mode
Consider the following storage location layouts:
/<input storage>
/$DATAPREP_INPUT_PREFIX
/<output storage>
/$DATAPREP_OUTPUT_PREFIX
The workflow to process these files in SYNC
mode is as follows:
- Compare the input and output storage locations
- Process the files that are present at the input but not at the output location
- Upload the processed file to the output storage location
2. ARCHIVE
mode
Consider the following storage location layouts:
/<input storage>
/$DATAPREP_INPUT_PREFIX
/$DATAPREP_ERROR_PREFIX
/$DATAPREP_ARCHIVE_PREFIX
/<output storage>
/$DATAPREP_OUTPUT_PREFIX
The workflow to process these files in ARCHIVE
mode is as follows:
- Compare the input and output storage locations with a pagination methodology.
- Process files present in the input but not in the output location.
- Move the raw input file to the archive folder in the input storage location for the successfully processed file.
- Move the raw input file to the error folder in the input storage location for the failed processed file.
- Delete processed files from the input storage.
Lastly, here is an example of how to structure a YAML file for a DataPrep orchestrator file sync job:
"Prep My Data":
extends:
- .agent_tag
stage: "Clean Input Data"
image: $DATAOPS_DATAPREP_RUNNER_IMAGE
variables:
PREP_ACTION: SYNC
DATAPREP_SCRIPT_PATH: $CI_PROJECT_DIR/dataops/dataprep/
DATAPREP_SCRIPT_NAME: clean-input-file.sh
# storage-specific configuration
## input storage
DATAPREP_INPUT_STORAGE: AWS
DATAPREP_INPUT_BUCKET: s3-input-bucket
DATAPREP_INPUT_PREFIX: "your/input/"
## output storage
DATAPREP_OUTPUT_STORAGE: AWS
DATAPREP_OUTPUT_BUCKET: s3-output-bucket
DATAPREP_OUTPUT_PREFIX: "your/output/"
# file filter
DATAPREP_FILE_FORMAT: all
script:
- /dataops
icon: DATAPREP_ICON
Supported parameters
You can add the following parameters to the file sync job:
Parameter | Required/Default | Description |
---|---|---|
PREP_ACTION | REQUIRED | Must be SYNC or ARCHIVE |
DATAPREP_SCRIPT_NAME | REQUIRED | The script name to run to process the file |
Storage Specific Configuration | REQUIRED | Storage specific configuration. See Storage Specific Configuration |
DATAPREP_INPUT_STORAGE | Optional - defaults to AWS | Storage to use as input location. Must be one of AWS , Azure , or Local - case insensitive |
DATAPREP_OUTPUT_STORAGE | Optional - defaults to AWS | Storage to use as output location. Must be one of AWS , Azure , or Local - case insensitive |
DATAPREP_SCRIPT_PATH | Optional - defaults to CI_PROJECT_DIR | File path where the script is saved |
DATAPREP_RUN_ENV | Optional - defaults to bash | Environment in which the script will be run. Must be one of bash , python , python3 , node or r - case sensitive |
DATAPREP_INPUT_PREFIX | Optional | Prefix/folder to set as a directory for input files |
DATAPREP_OUTPUT_PREFIX | Optional | Prefix/folder to set as a directory for output files |
DATAPREP_ARCHIVE_PREFIX | Optional - defaults to archive | Prefix/folder to set as a directory for archive files in ARCHIVE mode |
DATAPREP_ERROR_PREFIX | Optional - defaults to error | Prefix/folder to set as a directory for error files in ARCHIVE mode |
DATAPREP_FILE_FORMAT | Optional - defaults to csv | File format to select and filter files from both storage locations |
DATAPREP_MAX_JOBS | Optional - defaults to number of cores on system | Number of files-sync/jobs to run in parallel |
DATAOPS_DATAPREP_FAIL_ON_ERROR | Optional | Set a value to fail the script if the error folder has any file. Note: If DATAPREP_ERROR_PREFIX is not set, the orchestrator uses a default value. |
PAGE_SIZE | Optional - defaults to 200 | Value for page size in ARCHIVE mode |
The following topics describe several environment variables in detail:
DATAPREP_FILE_FORMAT
If no format is specified in the environment variable DATAPREP_FILE_FORMAT
, only files ending with .csv
are selected from both the input and output storage locations.
If multiple file formats are to be selected by the DataPrep orchestrator, you can specify all file formats as comma-separated values in this variable.
For example, you can match multiple file types at once. If .csv
, .tsv
, and .txt
files must be processed, use:
"Prep My Data":
extends:
- .agent_tag
stage: "Clean Input Data"
image: $DATAOPS_DATAPREP_RUNNER_IMAGE
variables:
PREP_ACTION: SYNC
DATAPREP_SCRIPT_PATH: $CI_PROJECT_DIR/dataops/dataprep/
DATAPREP_SCRIPT_NAME: clean-input-file.sh
DATAPREP_INPUT_STORAGE: AWS
DATAPREP_INPUT_BUCKET: s3-input-bucket
DATAPREP_INPUT_PREFIX: "your/input/"
DATAPREP_OUTPUT_STORAGE: AWS
DATAPREP_OUTPUT_BUCKET: s3-output-bucket
DATAPREP_OUTPUT_PREFIX: "your/output/"
DATAPREP_FILE_FORMAT: "csv,tsv,txt"
script:
- /dataops
icon: DATAPREP_ICON
If all files present in the input storage location are to be selected by the DataPrep orchestrator, the variable DATAPREP_FILE_FORMAT
must be set to all
.
If the storage location has files with the extension .all
, you must set the DATAPREP_FILE_FORMAT
value to all,
to specify as file extension and not all files.
DATAPREP_MAX_JOBS
The value set in DATAPREP_MAX_JOBS
determines how many data-prep operations are runnable in parallel. The default value equals the number of CPU cores present on the system if there is no value in this environment variable.
Setting this variable as a high value increases the number of parallel jobs executed at the cost of system resource usage.
In case the files being used are very large or the user-script running requires high CPU usage or system availability is low, setting a small value to the variable DATAPREP_MAX_JOBS
may be advantageous.
Files created by the DataPrep orchestrator are deleted when they are no longer needed to prevent the system from running out of disk space.
After running the user script, the downloaded/input file is deleted. Once the generated/output is uploaded, it is deleted as well.
PAGE_SIZE
In ARCHIVE mode, the orchestrator retrieves only a small number instead of fetching the list of all files in the storage location. This number of files retrieved at a time is controlled by the variable PAGE_SIZE
.
The orchestrator uses the pagination functionality of the orchestrator to fetch a limited number of files defined in the variable PAGE_SIZE
(defaults to 2000) which form a batch. The orchestrator can now start processing all the files available in the batch. The number of files processed at a time is controlled by the variable DATAPREP_MAX_JOBS.
Once the orchestrator processes all the files in the batch, it fetches a new set of files to initialize another batch. This process is repeated until all the files present in the storage location have been processed.
See section Improving CPU utilization in ARCHIVE mode to read more about improving performance.
DATAPREP_RUN_ENV
variables:
DATAPREP_RUN_ENV: bash
The DataPrep orchestrator supports the following environments to run the script in:
bash
- Default environment is no environment is specified
- Version: GNU bash, version 4.4.20(1)-release
python
- Uses Python3
- Version: 3.6.9
python3
- Version: 3.6.9
r
- Version: 4.0.4 (2021-02-15) -- "Lost Library Book"
node
- Version: v8.10.0
perl
- Version: perl 5, version 26, subversion 1 (v5.26.1)
DATAPREP_SCRIPT_NAME
The DataPrep orchestrator allows users to run their own scripts modifying the data as per their needs. For each file detected by data-prep to be processed, the following are passed to the script to be run in the given order:
- File Name - the name of the input file
- Input Folder - the folder in which the file exists
- Output Folder - the folder in which the output file(s) are to be saved
The script can then perform the necessary changes to mold the file as required and save the file(s) in the specified output folder.
For each file to be processed, the file name, the input folder path, and the output folder path are passed to the user-provided script. At the end of the execution of the script, DataPrep assumes the file(s) to present in the specified output folder.
We recommend placing custom scripts into your DataOps project folder /dataops/dataprep
.
During job execution on an orchestrator, refer to it with $CI_PROJECT_DIR/dataops/dataprep/your_script
.
To get you started, below are sample scripts, available in the orchestrator's /scripts/
directory, that copy the file from the input folder to the output folder:
- Shell
- Python
- R
- JavaScript
- Perl
# Save file name and folder paths in variables from the command line argument
FILE_NAME="$1"
INPUT_FOLDER="$2"
OUTPUT_FOLDER="$3"
INPUT_FILE_PATH="$INPUT_FOLDER/$FILE_NAME"
OUTPUT_FILE_PATH="$OUTPUT_FOLDER/$FILE_NAME"
echo "Starting copy from $INPUT_FILE_PATH to $OUTPUT_FILE_PATH"
# Copy the file from the input path to the output path
cp -v "$INPUT_FILE_PATH" "$OUTPUT_FILE_PATH"
echo "Copy Complete"
import shutil
import sys
# Save file name and folder paths in variables from the command line argument
file_name = sys.argv[1]
input_folder = sys.argv[2]
output_folder = sys.argv[3]
input_file_path = f"{input_folder}/{file_name}"
output_file_path = f"{output_folder}/{file_name}"
print(f"Starting copy from {input_file_path} to {output_file_path}")
# Copy the file from the input path to the output path
shutil.copyfile(input_file_path, output_file_path)
print("Copy Complete")
# Save file name and folder paths in variables from the command line argument
args <- commandArgs()
filename <- args[6]
inputFolder <- args[7]
outputFolder <- args[8]
inputFilePath <- paste(inputFolder, filename, sep="/")
outputFilePath <- paste(outputFolder, filename, sep="/")
("Starting copy from", inputFolder, "to", outputFolder, sep=" ")
# Copy the file from the input path to the output path
file.copy(inputFolder, outputFolder)
print("Copy Complete")
const fs = require("fs");
// Save file name and folder paths in variables from the command line argument
const fileName = process.argv[2];
const inputFolder = process.argv[3];
const outputFolder = process.argv[4];
const inputFilePath = `${inputFolder}/${fileName}`;
const outputFilePath = `${outputFolder}/${fileName}`;
console.log(`Starting copy from ${inputFolder} to ${outputFolder}`);
// Copy the file from the input path to the output path
fs.copyFile(inputFilePath, outputFilePath, (err) => {
if (err) throw err;
console.log("Copy Complete");
});
use warnings FATAL => 'all';
use strict;
use File::Copy;
# Save file name and folder paths in variables from the command line argument
my $fileName = $ARGV[0];
my $inputFolder = $ARGV[1];
my $outputFolder = $ARGV[2];
my $inputFilePath = $inputFolder + "/" + $fileName;
my $outputFilePath = $outputFolder + "/" + $fileName;
print("Starting copy from $inputFilePath to $outputFilePath\n");
# Copy the file from the input path to the output path
copy($inputFilePath,$outputFilePath) or die "Copy failed: $!";
print("Copy Complete")
Storage-specific configuration
It is mandatory to configure storage locations when working with the DataPrep orchestrator that allows the processing and synchronizing of files between two locations, such as two different AWS S3 buckets or a local folder and an Azure Storage Blob.
AWS storage specific configuration
AWS as input storage
"Prep My Data":
variables:
DATAPREP_INPUT_BUCKET: bucket
DATAPREP_INPUT_AWS_REGION: us-west-2
DATAPREP_INPUT_AWS_ACCESS_KEY_ID: $MY_AWS_ACCESS_KEY_ID
DATAPREP_INPUT_AWS_SECRET_ACCESS_KEY: $MY_SECRET_ACCESS_KEY
Parameter | Required or Optional (and default) | Description |
---|---|---|
DATAPREP_INPUT_AWS_ACCESS_KEY_ID | REQUIRED | AWS_ACCESS_KEY_ID to be used for connecting to the input storage |
DATAPREP_INPUT_AWS_SECRET_ACCESS_KEY | REQUIRED | AWS_SECRET_ACCESS_KEY to be used for connecting to the input storage |
DATAPREP_INPUT_BUCKET | REQUIRED | Bucket to be used for the input storage location |
DATAPREP_INPUT_AWS_REGION | Optional - defaults to us-west-2 | Region to be used |
DATAPREP_INPUT_PROFILE_NAME | Optional - defaults to input | Name with which the profile for input storage is saved |
AWS as output storage
"Prep My Data":
variables:
DATAPREP_OUTPUT_BUCKET: bucket
DATAPREP_OUTPUT_AWS_REGION: us-west-2
DATAPREP_OUTPUT_AWS_ACCESS_KEY_ID: $MY_AWS_ACCESS_KEY_ID
DATAPREP_OUTPUT_AWS_SECRET_ACCESS_KEY: $MY_SECRET_ACCESS_KEY
Parameter | Required or Optional (and default) | Description |
---|---|---|
DATAPREP_OUTPUT_AWS_ACCESS_KEY_ID | REQUIRED | AWS_ACCESS_KEY_ID to be used for connecting to the output storage |
DATAPREP_OUTPUT_AWS_SECRET_ACCESS_KEY | REQUIRED | AWS_SECRET_ACCESS_KEY to be used for connecting to the output storage |
DATAPREP_OUTPUT_BUCKET | REQUIRED | Bucket to be used for the output storage location |
DATAPREP_OUTPUT_AWS_REGION | Optional - defaults to us-west-2 | Region to be used for output storage |
DATAPREP_OUTPUT_PROFILE_NAME | Optional - defaults to output | Name with which the profile for output storage is saved |
Azure storage specific configuration
Azure as input storage
"Prep My Data":
variables:
DATAPREP_INPUT_CONTAINER: sample-container
DATAPREP_INPUT_AZURE_ACCOUNT: $MY_INPUT_AZURE_ACCOUNT
DATAPREP_INPUT_AZURE_SAS_TOKEN: $MY_INPUT_AZURE_SAS_TOKEN
Parameter | Required or Optional (and default) | Description |
---|---|---|
DATAPREP_INPUT_AZURE_ACCOUNT | REQUIRED | Azure account to be used for connecting to the input storage |
DATAPREP_INPUT_AZURE_SAS_TOKEN | REQUIRED | Azure SAS token to be used for connecting to the input storage |
DATAPREP_INPUT_CONTAINER | REQUIRED | Container to be used for the input storage location |
Azure as output storage
"Prep My Data":
variables:
DATAPREP_OUTPUT_CONTAINER: sample-container
DATAPREP_OUTPUT_AZURE_ACCOUNT: $MY_OUTPUT_AZURE_ACCOUNT
DATAPREP_OUTPUT_AZURE_SAS_TOKEN: $MY_OUTPUT_AZURE_SAS_TOKEN
Parameter | Required or Optional (and default) | Description |
---|---|---|
DATAPREP_OUTPUT_AZURE_ACCOUNT | REQUIRED | Azure account to be used for connecting to the output storage |
DATAPREP_OUTPUT_AZURE_SAS_TOKEN | REQUIRED | Azure SAS token to be used for connecting to the output storage |
DATAPREP_OUTPUT_CONTAINER | REQUIRED | Container to be used for the output storage location |
Local storage specific configuration
Local storage requires that the orchestrator has access to the directories you want to process. For large files, this requires modifying the DataOps Runner setup and adding additional docker volumes. For smaller files that are being passed between jobs, you can reference the pipeline cache.
Local as input storage
"Prep My Data":
variables:
DATAPREP_INPUT_DIRECTORY: "/agent_cache/filetransfer/in"
Parameter | Required or Optional (and default) | Description |
---|---|---|
DATAPREP_INPUT_DIRECTORY | REQUIRED | Path of the directory to be used as input storage location |
Local as output storage
"Prep My Data":
variables:
DATAPREP_INPUT_DIRECTORY: "/agent_cache/filetransfer/out"
Parameter | Required or Optional (and default) | Description |
---|---|---|
DATAPREP_OUTPUT_DIRECTORY | REQUIRED | Path of the directory to be used as output storage location |
Improving CPU utilization in ARCHIVE mode
The following tips can be used to improve CPU utilization and overall performance.
Small number of cores
If the runner system has a small number of cores, then using the default or a small value for PAGE_SIZE
can improve the performance. As the batch size initialized would be relatively high, there would always be a file to be processed by the orchestrator.
Once the batch has been completed, only then the next set of files are retrieved to create a new batch. This leaves only a small window during new batch initialization where all the CPU cores are not being utilized.
Large number of cores
If the runner system has a large number of cores, then using a large value in PAGE_SIZE
would yield a better result. Using a large number initializes a large batch for the orchestrator. Even if the files are processed relatively fast (a few seconds), there would still be a long queue waiting to be processed.
PAGE_SIZE
should be set to 3-4 times the number of CPU cores. If the average processing time of a file is small, then further increasing the value would help improve performance
Example jobs
The following is an example of a DataPrep job in SYNC mode:
"Prep ERP Data":
extends:
- .agent_tag
stage: "Snowflake orch and prep"
image: $DATAOPS_DATAPREP_RUNNER_IMAGE
variables:
PREP_ACTION: SYNC
DATAPREP_INPUT_BUCKET: XXXXX
DATAPREP_INPUT_PREFIX: "input/"
DATAPREP_OUTPUT_BUCKET: XXXXX
DATAPREP_OUTPUT_PREFIX: "dataprep_aws_output/"
DATAPREP_RUN_ENV: perl
DATAPREP_SCRIPT_NAME: copy.pl
DATAPREP_SCRIPT_PATH: /scripts/
script:
- /dataops
icon: DATAPREP_ICON
The following is an example of a DataPrep job in ARCHIVE mode
"Archive ERP Data":
extends:
- .agent_tag
stage: "Batch ingestion"
image: $DATAOPS_DATAPREP_RUNNER_IMAGE
variables:
PREP_ACTION: ARCHIVE
DATAPREP_INPUT_AWS_ACCESS_KEY_ID: XXXXX
DATAPREP_OUTPUT_AWS_ACCESS_KEY_ID: XXXXX
DATAPREP_INPUT_AWS_SECRET_ACCESS_KEY: XXXXX
DATAPREP_OUTPUT_AWS_SECRET_ACCESS_KEY: XXXXX
DATAPREP_INPUT_BUCKET: XXXXX
DATAPREP_INPUT_PREFIX: "input_archive/"
DATAPREP_OUTPUT_BUCKET: XXXXX
DATAPREP_OUTPUT_PREFIX: "dataprep_aws_output/"
DATAPREP_RUN_ENV: perl
DATAPREP_SCRIPT_NAME: copy.pl
DATAPREP_SCRIPT_PATH: /scripts/
PAGE_SIZE: 20
script:
- /dataops
icon: DATAPREP_ICON