Pipe
You can provide configuration to Snowflake Object Lifecycle Engine for the following operations with pipe:
- Manage the lifecycle of new and existing pipes
- Manage grants of pipe
Usage
- Default Configuration
- Data Products Configuration
databases:
<database-name>:
schemas:
<schema-name>:
pipes:
<pipe-name>:
<configuration-key>: <value>
grants:
<privilege>:
- <role-name>
- <role-name>
- pipe:
name: <pipe-name>
<configuration-key>: <value>
database: rel(database.<database-name>)
schema: rel(schema.<schema-name>)
grants:
<privilege>:
- rel(role.<role-name>)
- rel(role.<role-name>)
Supported parameters
The engine supports the parameters listed below.
Configuration Key | Required/Optional | Data Types and Values | Description |
---|---|---|---|
copy_statement | Required | Object: See here for a definition of copy_statement | Specifies the copy statement for the pipe. Check the Snowflake Docs for COPY INTO table copy options and file format. |
auto_ingest | Optional | Boolean | Specifies an auto_ingest param for the pipe |
aws_sns_topic_arn | Optional | String | Specifies the Amazon Resource Name (ARN) for the SNS topic for your S3 bucket |
comment | Optional | String | Specifies a comment for the pipe |
deleted | Optional | Boolean: True enables deletion prevention, False does nothing | Specifies what objects are allowed to be deleted |
error_integration | Optional | String | Specifies the name of the notification integration used to communicate with Amazon SNS. Info: Required only when configuring Snowpipe to send error notifications using Amazon Simple Notification Service (SNS). For more info, see Enabling Error Notifications for Snowpipe.. |
grants | Optional | Map: See Supported Pipe Grants to Roles | List of Privileges and Roles to which privileges are granted on the current pipe |
integration | Optional | String | Specifies the existing notification integration used to access the storage queue. Info: Required only when configuring AUTO_INGEST for Google Cloud Storage or Microsoft Azure stages. For more info, see Automating Snowpipe for Google Cloud Storage and Automating Snowpipe for Microsoft Azure Blob Storage. Note: AUTO_INGEST must be set to TRUE . |
manage_mode | Optional | Enumeration: all (default), none , grants | Configures what properties to manage for the pipe. See Changing Manage Mode before changing the value. |
During subsequent pipeline runs a force-replacement behavior might be observed in PLAN and APPLY phases for a few parameters like copy_statement
, auto_ingest
, aws_sns_topic_arn
, etc.
Supported pipe grants to roles
Following are the privileges you can grant to roles in the pipe definition:
- ALL PRIVILEGES
- MONITOR
- OPERATE
- OWNERSHIP
When you define ALL PRIVILEGES in the SOLE configuration file, you grant all the privileges listed above to roles on this object except OWNERSHIP. However, the management of ALL PRIVILEGES in SOLE differs from its handling in Snowflake. See Handling ALL PRIVILEGES in SOLE for more information.
Examples
Below are some examples of configuring pipes.
Pipe configuration with the type
parameter
- Default Configuration
- Data Products Configuration
databases:
SALES_RECORD:
comment: "product sales record"
schemas:
SCHEMA_1:
comment: "sales records for year 2021"
pipes:
PIPE_1:
comment: A pipe.
auto_ingest: false
error_integration: "NOTIFICATION_INTEGRATION_SNS"
copy_statement:
into:
database: "DATABASE_1"
schema: "SCHEMA_1"
table: "TABLE_1"
from:
database: "DUMMY_DATABASE"
schema: "PUBLIC"
stage: "LIFECYCLE_STAGE"
pattern: ".*[.]csv"
file_format:
type: "CSV"
compression: "AUTO"
binary_format: "utf8"
trim_space: false
enable_octal: false
strip_outer_array: false
strip_null_values: false
replace_invalid_characters: false
ignore_utf8_errors: true
skip_byte_order_mark: false
enforce_length: false
truncate_columns: true
grants:
MONITOR:
- SYSADMIN
- ACCOUNTADMIN
- pipe:
name: PIPE_1
database: rel(database.SALES_RECORD)
schema: rel(schema.SCHEMA_1)
comment: A pipe.
auto_ingest: false
error_integration: "NOTIFICATION_INTEGRATION_SNS"
copy_statement:
into:
database: rel(database.DATABASE_1)
schema: rel(schema.SCHEMA_1)
table: rel(table.TABLE_1)
from:
database: rel(database.DUMMY_DATABASE)
schema: rel(schema.PUBLIC)
stage: rel(stage.LIFECYCLE_STAGE)
pattern: ".*[.]csv"
file_format:
type: "CSV"
compression: "AUTO"
binary_format: "utf8"
trim_space: false
enable_octal: false
strip_outer_array: false
strip_null_values: false
replace_invalid_characters: false
ignore_utf8_errors: true
skip_byte_order_mark: false
enforce_length: false
truncate_columns: true
grants:
MONITOR:
- SYSADMIN
- ACCOUNTADMIN
Pipe configuration without the type
parameter
- Default Configuration
- Data Products Configuration
databases:
SALES_RECORD_FEB:
comment: "product sales record"
schemas:
SCHEMA_1:
comment: "sales records for year 2021 of feb month"
pipes:
PIPE_2:
comment: A pipe
integration: "NOTIFICATION_INTEGRATION_GCP"
auto_ingest: false
copy_statement:
into:
database: "DATABASE_2"
schema: "SCHEMA_2"
table: "TABLE_2"
from:
database: "DUMMY_DATABASE"
schema: "PUBLIC"
stage: "LIFECYCLE_STAGE_GCP"
pattern: ".*[.]csv"
file_format:
format_database: "DUMMY_DB"
format_schema: "PUBLIC"
format_name: "DMY_FORMAT"
grants:
OPERATE:
- SYSADMIN
- pipe:
name: PIPE_2
database: rel(database.SALES_RECORD)
schema: rel(schema.SCHEMA_1)
comment: A pipe
integration: "NOTIFICATION_INTEGRATION_GCP"
auto_ingest: false
copy_statement:
into:
database: rel(database.DATABASE_2)
schema: rel(schema.SCHEMA_2)
table: rel(table.TABLE_2)
from:
database: rel(database.DUMMY_DATABASE)
schema: rel(schema.PUBLIC)
stage: rel(stage.LIFECYCLE_STAGE_GCP)
file_format:
format_database: rel(database.DUMMY_DB)
format_schema: rel(schema.PUBLIC)
format_name: rel(file_format.DMY_FORMAT)
grants:
OPERATE:
- SYSADMIN