DataPrep Orchestrator
Type | Pre-Set |
---|---|
Image | $DATAOPS_DATAPREP_RUNNER_IMAGE |
The DataOps DataPrep Orchestrator is a pre-set orchestrator that allows processing and synchronizing of files between two locations, such as two different AWS S3 buckets or a local folder and an Azure Storage Blob.
All the files present in the input/source location are compared to the output/target location. Files that are not present on the target location are downloaded from the source, processed, and uploaded to the target.
The file processing is handled by either Python or bash scripts, with the following locations supported and interchangeable:
- AWS S3 Bucket
- Local Directory
- Azure Storage Blob Container
Usage
The DataPrep Orchestrator workflow is as follows:
- Check the defined storage locations
- For both input and output storage locations, check for required keys
- Configure the environment
- Test the connections with the supplied credentials
- Fetch the list of files
- Fetch the list of files located in the input storage location
- Fetch the list of files located in the output storage location
- Compare the file list by name
- Start synchronization for each file present in the input but not the output.
- Download the file from the input storage location
- Process the file using the user-specified script
- Upload the processed file to the output storage location
Modes
The DataPrep Orchestrator supports the following modes:
1. SYNC Mode
Consider the following storage location layouts:
/<input storage>
/$DATAPREP_INPUT_PREFIX
/<output storage>
/$DATAPREP_OUTPUT_PREFIX
The workflow to process these files in SYNC Mode is as follows:
- Compare the input and output storage locations
- Process the files that are present in the input but not the output location
- Upload the processed file to the output storage location
2. ARCHIVE Mode
Consider the following storage location layouts:
/<input storage>
/$DATAPREP_INPUT_PREFIX
/$DATAPREP_ERROR_PREFIX
/$DATAPREP_ARCHIVE_PREFIX
/<output storage>
/$DATAPREP_OUTPUT_PREFIX
The workflow to process these files in ARCHIVE Mode is as follows:
- Compare the input and output storage locations with a pagination methodology
- Process files that are present in input but not in the output location
- For the successfully processed file
- Move the raw input file to the archive folder in the input storage location
- For the failed processed file
- Move the raw input file to the error folder in the input storage location
- Delete processed files from the input storage
Lastly, here is an example of how to structure a YAML file for a DataPrep Orchestrator file sync job:
"Prep My Data":
extends:
- .agent_tag
stage: "Clean Input Data"
image: $DATAOPS_DATAPREP_RUNNER_IMAGE
variables:
PREP_ACTION: SYNC
DATAPREP_SCRIPT_PATH: $CI_PROJECT_DIR/dataops/dataprep/
DATAPREP_SCRIPT_NAME: clean-input-file.sh
# storage specific configuration
## input storage
DATAPREP_INPUT_STORAGE: AWS
DATAPREP_INPUT_BUCKET: s3-input-bucket
DATAPREP_INPUT_PREFIX: "your/input/"
## output storage
DATAPREP_OUTPUT_STORAGE: AWS
DATAPREP_OUTPUT_BUCKET: s3-output-bucket
DATAPREP_OUTPUT_PREFIX: "your/output/"
# file filter
DATAPREP_FILE_FORMAT: all
script:
- /dataops
icon: DATAPREP_ICON
Supported Parameters
The following parameters can be added to the file sync job:
Parameter | Required/Default | Description |
---|---|---|
PREP_ACTION | REQUIRED | Must be SYNC or ARCHIVE |
DATAPREP_SCRIPT_NAME | REQUIRED | The script name to be run to process the file |
Storage Specific Configuration | REQUIRED | Storage specific configuration. See Storage Specific Configuration |
DATAPREP_INPUT_STORAGE | Optional - defaults to AWS | Storage to be used as input location. Must be one of AWS , Azure , or Local - case insensitive |
DATAPREP_OUTPUT_STORAGE | Optional - defaults to AWS | Storage to be used as output location. Must be one of AWS , Azure , or Local - case insensitive |
DATAPREP_SCRIPT_PATH | Optional - defaults to CI_PROJECT_DIR | File path where the script is saved |
DATAPREP_RUN_ENV | Optional - defaults to bash | Environment in which the script will be run. Must be one of bash , python , python3 , python2 , node or r - case sensitive |
DATAPREP_INPUT_PREFIX | Optional | Prefix/folder to be set as directory for input files |
DATAPREP_OUTPUT_PREFIX | Optional | Prefix/folder to be set as directory for output files |
DATAPREP_ARCHIVE_PREFIX | Optional - defaults to archive | Prefix/folder to be set as directory for archive files in ARCHIVE mode |
DATAPREP_ERROR_PREFIX | Optional - defaults to error | Prefix/folder to be set as directory for error files in ARCHIVE mode |
DATAPREP_FILE_FORMAT | Optional - defaults to csv | File format to select and filter files from both storage locations |
DATAPREP_MAX_JOBS | Optional - defaults to number of cores on system | Number of files-sync/jobs to run in parallel |
PAGE_SIZE | Optional - defaults to 200 | Value for Page Size in ARCHIVE mode |
The following points describe several environment variables in detail:
DATAPREP_FILE_FORMAT
If no format is specified in the environment variable DATAPREP_FILE_FORMAT
, only files ending with .csv
are selected from both the input and output storage locations.
If multiple file formats are to be selected by the dataops-dataprep-orchestrator, you can specify all file formats (as comma-separated values) in this variable.
Example: You can match multiple file types at once. If .csv
, .tsv
and .txt
files must be processed use:
"Prep My Data":
extends:
- .agent_tag
stage: "Clean Input Data"
image: $DATAOPS_DATAPREP_RUNNER_IMAGE
variables:
PREP_ACTION: SYNC
DATAPREP_SCRIPT_PATH: $CI_PROJECT_DIR/dataops/dataprep/
DATAPREP_SCRIPT_NAME: clean-input-file.sh
DATAPREP_INPUT_STORAGE: AWS
DATAPREP_INPUT_BUCKET: s3-input-bucket
DATAPREP_INPUT_PREFIX: "your/input/"
DATAPREP_OUTPUT_STORAGE: AWS
DATAPREP_OUTPUT_BUCKET: s3-output-bucket
DATAPREP_OUTPUT_PREFIX: "your/output/"
DATAPREP_FILE_FORMAT: "csv,tsv,txt"
script:
- /dataops
icon: DATAPREP_ICON
::: tip
If all files present in the input storage location are to be selected by dataops-dataprep-orchestrator, the value of the variable DATAPREP_FILE_FORMAT
must be set to all
.
:::
tip
If the storage location has files with extension .all
, the value of the DATAPREP_FILE_FORMAT
should be set to all,
to specify as file extension and not all files.
DATAPREP_MAX_JOBS
The value set in DATAPREP_MAX_JOBS determines how many data-prep operations are runnable in parallel. The default value equals the number of CPU cores present on the system if there is no value in this environment variable.
Setting this variable as a high value increases the number of parallel jobs executed at the cost of system resource usage.
In case the files being used are very large or the user-script running requires high CPU usage or system availability is low, setting a small value to the variable DATAPREP_MAX_JOBS
may be advantageous.
Files created by the DataPrep Orchestrator are deleted when they are no longer needed to prevent the system from running out of disk space.
After running the user script, the downloaded/input file is deleted. Once the generated/output is uploaded, it is deleted as well.
AWS Storage Specific Configuration
AWS as Input Storage
"Prep My Data":
variables:
DATAPREP_INPUT_BUCKET: bucket
DATAPREP_INPUT_AWS_REGION: us-west-2
DATAPREP_INPUT_AWS_ACCESS_KEY_ID: $MY_AWS_ACCESS_KEY_ID
DATAPREP_INPUT_AWS_SECRET_ACCESS_KEY: $MY_SECRET_ACCESS_KEY
Parameter | Required or Optional (and default) | Description |
---|---|---|
DATAPREP_INPUT_AWS_ACCESS_KEY_ID | REQUIRED | AWS_ACCESS_KEY_ID to be used for connecting to the input storage |
DATAPREP_INPUT_AWS_SECRET_ACCESS_KEY | REQUIRED | AWS_SECRET_ACCESS_KEY to be used for connecting to the input storage |
DATAPREP_INPUT_BUCKET | REQUIRED | Bucket to be used for the input storage location |
DATAPREP_INPUT_AWS_REGION | Optional - defaults to us-west-2 | Region to be used |
DATAPREP_INPUT_PROFILE_NAME | Optional - defaults to input | Name with which the profile for input storage is saved |
AWS as Output Storage
"Prep My Data":
variables:
DATAPREP_OUTPUT_BUCKET: bucket
DATAPREP_OUTPUT_AWS_REGION: us-west-2
DATAPREP_OUTPUT_AWS_ACCESS_KEY_ID: $MY_AWS_ACCESS_KEY_ID
DATAPREP_OUTPUT_AWS_SECRET_ACCESS_KEY: $MY_SECRET_ACCESS_KEY
Parameter | Required or Optional (and default) | Description |
---|---|---|
DATAPREP_OUTPUT_AWS_ACCESS_KEY_ID | REQUIRED | AWS_ACCESS_KEY_ID to be used for connecting to the output storage |
DATAPREP_OUTPUT_AWS_SECRET_ACCESS_KEY | REQUIRED | AWS_SECRET_ACCESS_KEY to be used for connecting to the output storage |
DATAPREP_OUTPUT_BUCKET | REQUIRED | Bucket to be used for the output storage location |
DATAPREP_OUTPUT_AWS_REGION | Optional - defaults to us-west-2 | Region to be used for output storage |
DATAPREP_OUTPUT_PROFILE_NAME | Optional - defaults to output | Name with which the profile for output storage is saved |
Azure Storage Specific Configuration
Azure as Input Storage
"Prep My Data":
variables:
DATAPREP_INPUT_CONTAINER: sample-container
DATAPREP_INPUT_AZURE_ACCOUNT: $MY_INPUT_AZURE_ACCOUNT
DATAPREP_INPUT_AZURE_SAS_TOKEN: $MY_INPUT_AZURE_SAS_TOKEN
Parameter | Required or Optional (and default) | Description |
---|---|---|
DATAPREP_INPUT_AZURE_ACCOUNT | REQUIRED | Azure account to be used for connecting to the input storage |
DATAPREP_INPUT_AZURE_SAS_TOKEN | REQUIRED | Azure SAS token to be used for connecting to the input storage |
DATAPREP_INPUT_CONTAINER | REQUIRED | Container to be used for the input storage location |
Azure as Output Storage
"Prep My Data":
variables:
DATAPREP_OUTPUT_CONTAINER: sample-container
DATAPREP_OUTPUT_AZURE_ACCOUNT: $MY_OUTPUT_AZURE_ACCOUNT
DATAPREP_OUTPUT_AZURE_SAS_TOKEN: $MY_OUTPUT_AZURE_SAS_TOKEN
Parameter | Required or Optional (and default) | Description |
---|---|---|
DATAPREP_OUTPUT_AZURE_ACCOUNT | REQUIRED | Azure account to be used for connecting to the output storage |
DATAPREP_OUTPUT_AZURE_SAS_TOKEN | REQUIRED | Azure SAS token to be used for connecting to the output storage |
DATAPREP_OUTPUT_CONTAINER | REQUIRED | Container to be used for the output storage location |
Local Storage Specific Configuration
Local storage requires that the orchestrator has access to the directories you want to process. For large files this will require to modify the DataOps Runner setup and add additional docker volumes. For smaller files that are being passed between Jobs you can reference the pipeline cache.
Local as Input Storage
"Prep My Data":
variables:
DATAPREP_INPUT_DIRECTORY: "/agent_cache/filetransfer/in"
Parameter | Required or Optional (and default) | Description |
---|---|---|
DATAPREP_INPUT_DIRECTORY | REQUIRED | Path of the directory to be used as input storage location |
Local as Output Storage
"Prep My Data":
variables:
DATAPREP_INPUT_DIRECTORY: "/agent_cache/filetransfer/out"
Parameter | Required or Optional (and default) | Description |
---|---|---|
DATAPREP_OUTPUT_DIRECTORY | REQUIRED | Path of the directory to be used as output storage location |
DATAPREP_RUN_ENV
variables:
DATAPREP_RUN_ENV: bash
The DataPrep Orchestrator supports the following environments to run the script in:
bash
- Default environment is no environment is specified
- Version: GNU bash, version 4.4.20(1)-release
python
- Uses Python3
- Version: 3.6.9
python3
- Version: 3.6.9
python2
- Version: 2.7.17
r
- Version: 4.0.4 (2021-02-15) -- "Lost Library Book"
node
- Version: v8.10.0
perl
- Version: perl 5, version 26, subversion 1 (v5.26.1)
DATAPREP_SCRIPT_NAME
The DataPrep Orchestrator allows users to run their own scripts modifying the data as per their need. For each file detected by data-prep to be processed, the following are passed to the script to be run in given order:
- File Name - name of the input file
- Input Folder - folder in which the file exists
- Output Folder - folder in which the output file(s) are to be saved
The script can then perform the necessary changes to mold the file as per the requirement and save file(s) in output folder specified.
For each file to be processed, the file name, the input folder path and output folder path are passed to the user provided script. At the end of execution of the script, DataPrep assumes the file(s) to present in the specified output folder.
We recommend to place your custom scripts into your DataOps project, folder /dataops/dataprep
.
During job execution on an Orchestrator refer to it with $CI_PROJECT_DIR/dataops/dataprep/your_script
.
To get you started, below are sample scripts, available in the orchestrator's /scripts/
directory, that copies the file from the input folder to the output folder:
- Shell
- Python
- R
- JavaScript
- Perl
# Save file name and folder paths in variables from command line argument
FILE_NAME="$1"
INPUT_FOLDER="$2"
OUTPUT_FOLDER="$3"
INPUT_FILE_PATH="$INPUT_FOLDER/$FILE_NAME"
OUTPUT_FILE_PATH="$OUTPUT_FOLDER/$FILE_NAME"
echo "Starting copy from $INPUT_FILE_PATH to $OUTPUT_FILE_PATH"
# Copy file from input path to output path
cp -v "$INPUT_FILE_PATH" "$OUTPUT_FILE_PATH"
echo "Copy Complete"
import shutil
import sys
# Save file name and folder paths in variables from command line argument
file_name = sys.argv[1]
input_folder = sys.argv[2]
output_folder = sys.argv[3]
input_file_path = f"{input_folder}/{file_name}"
output_file_path = f"{output_folder}/{file_name}"
print(f"Starting copy from {input_file_path} to {output_file_path}")
# Copy file from input path to output path
shutil.copyfile(input_file_path, output_file_path)
print("Copy Complete")
# Save file name and folder paths in variables from command line argument
args <- commandArgs()
filename <- args[6]
inputFolder <- args[7]
outputFolder <- args[8]
inputFilePath <- paste(inputFolder, filename, sep="/")
outputFilePath <- paste(outputFolder, filename, sep="/")
("Starting copy from", inputFolder, "to", outputFolder, sep=" ")
# Copy file from input path to output path
file.copy(inputFolder, outputFolder)
print("Copy Complete")
const fs = require('fs');
// Save file name and folder paths in variables from command line argument
const fileName = process.argv[2]
const inputFolder = process.argv[3]
const outputFolder = process.argv[4]
const inputFilePath = `${inputFolder}/${fileName}`
const outputFilePath = `${outputFolder}/${fileName}`
console.log(`Starting copy from ${inputFolder} to ${outputFolder}`)
// Copy file from input path to output path
fs.copyFile(inputFilePath, outputFilePath, (err) => {
if (err) throw err;
console.log("Copy Complete")
})
use warnings FATAL => 'all';
use strict;
use File::Copy;
# Save file name and folder paths in variables from command line argument
my $fileName = $ARGV[0];
my $inputFolder = $ARGV[1];
my $outputFolder = $ARGV[2];
my $inputFilePath = $inputFolder + "/" + $fileName;
my $outputFilePath = $outputFolder + "/" + $fileName;
print("Starting copy from $inputFilePath to $outputFilePath\n");
# Copy file from input path to output path
copy($inputFilePath,$outputFilePath) or die "Copy failed: $!";
print("Copy Complete")
Example Jobs
The following is an example of a DataPrep job in SYNC mode:
"Prep ERP Data":
extends:
- .agent_tag
stage: "Snowflake orch and prep"
image: $DATAOPS_DATAPREP_RUNNER_IMAGE
variables:
PREP_ACTION: SYNC
DATAPREP_INPUT_BUCKET: XXXXX
DATAPREP_INPUT_PREFIX: "input/"
DATAPREP_OUTPUT_BUCKET: XXXXX
DATAPREP_OUTPUT_PREFIX: "dataprep_aws_output/"
DATAPREP_RUN_ENV: perl
DATAPREP_SCRIPT_NAME: copy.pl
DATAPREP_SCRIPT_PATH: /scripts/
script:
- /dataops
icon: DATAPREP_ICON
The following is an example of a DataPrep job in ARCHIVE mode
"Archive ERP Data":
extends:
- .agent_tag
stage: "Batch ingestion"
image: $DATAOPS_DATAPREP_RUNNER_IMAGE
variables:
PREP_ACTION: ARCHIVE
DATAPREP_INPUT_AWS_ACCESS_KEY_ID: XXXXX
DATAPREP_OUTPUT_AWS_ACCESS_KEY_ID: XXXXX
DATAPREP_INPUT_AWS_SECRET_ACCESS_KEY: XXXXX
DATAPREP_OUTPUT_AWS_SECRET_ACCESS_KEY: XXXXX
DATAPREP_INPUT_BUCKET: XXXXX
DATAPREP_INPUT_PREFIX: "input_archive/"
DATAPREP_OUTPUT_BUCKET: XXXXX
DATAPREP_OUTPUT_PREFIX: "dataprep_aws_output/"
DATAPREP_RUN_ENV: perl
DATAPREP_SCRIPT_NAME: copy.pl
DATAPREP_SCRIPT_PATH: /scripts/
PAGE_SIZE: 20
script:
- /dataops
icon: DATAPREP_ICON
Project Resources
None
Host Dependencies (and Resources)
None