Snowpipe with SNS & SQS

2024 Snowpipe Auto-Ingest via SNS & SQS → A definitive guide with all controls !!

Introduction:

What is Snowpipe ? Snowflake offers Snowpipe, a fully managed data ingestion solution. Snowpipe can be thought of as an “automated copy command,” similar to how the COPY command is used for batch data loading. Snowpipes can be created and managed using SQL just like any other Snowflake object since they are first-class Snowflake objects. Snowpipe uses notifications about newly landed files to automatically load files from an external stage.

AUTO-INGEST vs RESTAPI set-up of Snowpipe

Let me briefly discuss about the differences between these 2 modes of configuring the Snowpipe.

RESTAPI Method:

This involves explicit API calls to trigger Snowpipe process to load the data into Snowflake. The files to load and the order in which to load them are explicitly specified.

Use Cases: Perfect in situations when we need exact control over the timing of data use. Ideal for scenarios in which we wish to start loading files based on certain business logic or triggers when file arrival to the stage is not regular.

AUTO-INGEST Method:

AUTO_INGEST setting automatically starts the Snowpipe process whenever new files are found by using event notifications from cloud storage (such as AWS S3 and Azure Blob Storage).As soon as a file is staged, Snowflake automatically ingests it; no human intervention is required.

Use Cases: Perfect in situations when you want to automate the intake process to guarantee data availability almost instantly and files arrive often. Ideal for scenarios where reducing the time it takes for files to arrive and for data to become available in Snowflake is essential.

What is the prime focus of this blog on Snowpipe ?

In this blog we would in detail completely understand how to set-up the “AUTO-INGEST” capability by using Snowpipe. This meaning as soon as the file is there in external stage area then it should get loaded into Snowflake. The 2 different methods are:

Option 1 → Set-up Snowpipe via AWS-SQS services. Ingest the data to Snowflake via this set-up

Option 2 → Set-up Snowpipe via AWS-SNS services. Ingest the data to Snowflake via this set-up

We would also do a comparison on how these two set-ups are different in terms of the configurations needed.

It is also worth noting that we can set-up Snowpipe in multiple other methods like:

  • Using Snowpipe REST API to load the data (Use client to call the REST API or Use AWS-Lambda functions to call the REST API).
  • Setting up Amazon Eventbridge for doing the AUTO-INGEST.

Fundamentally, the way Snowpipe via AUTO-INGEST works is it automates data loads use cloud storage event alerts to alert Snowpipe when new data files are available for loading.

Snowpipe uses a queue(internal queue) to poll event notifications. Think of it like Snowpipe has its own SQS queue from where it does the polling for the events. Based on the parameters specified in a given pipe object, Snowpipe loads the new data files into the target table in a continuous, serverless manner utilizing the metadata in the queue.

Option 1: Snowpipe set-up via AWS-SQS services

We create all necessary perquisites before creating a Snowpipe. This is as follows:

Step 1.1Create a storage integration object.

Step 1.2Create file format.

Step 1.3Creating the external stage.

Code wise below are the steps that has been followed:

--Storage integration object:
CREATE OR REPLACE STORAGE INTEGRATION s3_storage_obj_int_forpipe
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'S3'
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::434362003194:role/demorolesnfpipe_01'
STORAGE_ALLOWED_LOCATIONS = ('s3://mydemosnowpipe01/');

--File format:
create or replace file format demo_csv_format
type = csv field_delimiter = ',' skip_header = 1
field_optionally_enclosed_by = '"'
null_if = ('NULL', 'null')
empty_field_as_null = true;

--external stage
create or replace stage ext_stage_pipe_demo url="s3://mydemosnowpipe01/"
STORAGE_INTEGRATION=s3_storage_obj_int_forpipe
file_format = demo_csv_format;

Step 2: Once this set-up is done we are all set to configure the Snowpipe for the near real time ingestion. Below is the syntax and the command how it is getting created:

Snow-pipe creation

The AUTO_INGEST parameter, which indicates whether you wish to load files depending on received alerts (TRUE), is one of the crucial ones to pay attention to in this case. Had it been through API calls we could have set-it as FALSE.

Step 3: Configuring event notifications in AWS for this set-up

A notification channel is automatically assigned to a Snowpipe object by Snowflake upon its creation when AUTO_INGEST = TRUE is set. Snowflake uses Amazon Simple Queue Service (SQS) to receive notifications if you are using Amazon Web Services (AWS). The notification_channel column in the DESC PIPE <pipe_name>; output contains the SQS ID value.

Snowflake notification channel

Step 4: Creating an event notification in S3 within AWS console.

To load files using Snowpipe, open the S3 bucket that contains the desired files. Locate the event notification configuration by selecting the properties tab. Make a new one with the desired name and complete the configuration.

The SQS integration has to be done between Snowflake & AWS

Step 5: That is it we are all set to see this in action.

Start loading the files within AWS-S3 and then you would see the same file getting loaded in Snowflake.

Method to check the time taken to load into Snowflake vis SQS:

For this use case, what we are considering is given as below:

File format: CSV, File size: 262 bytes, Load pattern: 1 file at a time.

With this set-up let us now have a way to measure the time taken practically.

File load timestamp in S3 is given as below, this is in UTC+5:30 hr format

S3 snapshot

Hence if we see this the file “data_ingestion_v6.csv”, has a time stamp of June 27, 2024, 22:24:22 (UTC+05:30), which when converted to UTC becomes “June 27, 2024, 16:54:22 (UTC)”.

Now, let us see the stats of Snowpipe, and please execute the below command before triggering the pipe to make the Timezone change to UTC.

-- Change the timezone to UTC in Snowflake.
ALTER SESSION SET TIMEZONE='UTC';

-- Post that trigger the Snowpipe & check the status by below command:
ALTER SESSION SET TIMEZONE='UTC';

The snapshot post loading the file is given as below.

If you observe here the “lastIngestedTimestamp”:”2024–06–27T16:54:36.382Z”.

Now we do the calculation:

Time of file arrival in S3: “June 27, 2024, 16:54:22 (UTC)

Snowpipe load completed at: “2024–06–27T16:54:36(UTC)”.

Hence the time taken to load was : 14 secs(now this is almost real time).

Option 2: Snowpipe set-up via AWS-SNS topics

We create all necessary perquisites before creating a Snowpipe. This is as follows:

Step 1.1Create a storage integration object.

Step 1.2Create file format.

Step 1.3Creating the external stage.

Code wise below are the steps that has been followed:

-- Storage integration object. This is created now.
CREATE OR REPLACE STORAGE INTEGRATION s3_storage_obj_int_forsnspipe
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'S3'
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::434362003194:role/demorolesnspipe01'
STORAGE_ALLOWED_LOCATIONS = ('s3://mydemosnspipe01/');


DESCRIBE integration s3_storage_obj_int_forsnspipe;

-- Creating the file format:
create or replace file format demo_csv_format_sns
type = csv field_delimiter = ',' skip_header = 1
field_optionally_enclosed_by = '"'
null_if = ('NULL', 'null')
empty_field_as_null = true;


create or replace stage ext_stage_pipe_snsdemo url="s3://mydemosnspipe01/"
STORAGE_INTEGRATION=s3_storage_obj_int_forsnspipe
file_format = demo_csv_format_sns;

LIST @ext_stage_pipe_snsdemo;

Step 2: SNS topic creation within AWS console.

Create a SNS topic by clicking on the options as shown below. Once created please also ensure that the ARN is noted and kept handy for further steps of integration.

SNS topic creation(demosnsdatacloud)

Step 3: SNS topic ARN to be used in Snowflake

Below is how we are going to use the ARN of SNS topic to get the AWS_SNS_IAM_POLICY as a metadata from Snowflake. Command used here within Snowflake is

select system$get_aws_sns_iam_policy('arn:aws:sns:us-east-1:434362003194:demosnsdatacloud');

Step 4: Now we have to carefully do the configuration in SNS topic

Please note these steps are important as we would be doing the integration between Snowflake & AWS.

Go the SNS topic console and edit the topic to go to “Access policy” as an option.

Access policy to be updated.
Changes to be done from line number 45 onwards.
Complete the integration of SNS as mentioned above

Please note here when we select the details from Snowflake, start with Sid, as it is highlighted in the screenshot.

Step 5: Go to the AWS-S3 as a console and complete the set-up in AWS side

Create the event notification with SNS this time as compared with SQS.

SNS configuration

Step 6: Create the Snowpipe with SNS

Please note over here the additional parameter against “aws_sns_topic” which has to be shared. This we need to get it from AWS-SNS console.

--Creating the Snowpipe with SNS::
CREATE OR REPLACE PIPE demomysnowpipesns
AUTO_INGEST = TRUE
AWS_SNS_TOPIC='arn:aws:sns:us-east-1:434362003194:demosnsdatacloud'
AS
COPY INTO DEMO_DB.DEMO_SCHEMA.QUARTERLY_SALES
FROM @ext_stage_pipe_snsdemo
FILE_FORMAT = demo_csv_format_sns;
Snowpipe with SNS set-up

Step 7: Snowpipe execution.

Now, the Snowpipe would start executing, the same can also be checked:

Checking the pipe status

Method to check the time taken to load into Snowflake vis SQS:

We follow the exact same method as we did in SQS loads. Let us see now the time taken to load the file in SNS settings by Snowpipe.

And the benchmarking would be the same only like:

File format: CSV, File size: 262 bytes, Load pattern: 1 file at a time.

S3 load time: June 27, 2024, 22:59:18 (UTC+05:30), i.e, in UTC June 27, 2024, 17:29:18(note this value)

Snowpipe load time:

The o/p of SELECT SYSTEM$PIPE_STATUS(‘demomysnowpipesns’);

“lastIngestedTimestamp”:”2024–06–27T17:29:26.341Z

Time of file arrival in S3: “June 27, 2024, 17:29:18 (UTC)

Snowpipe load completed at: “2024–06–27T17:29:26(UTC)”.

Hence the time taken to load was : 8 secs(now this is almost real time).

To summary, over here we say with SQS the latency was 14 secs and with SNS the latency is 8 secs which is like 40% more real time with SNS configuration as compared to SQS in this example.

Comparison of Snowpipe set-up with SNS & SQS:

Below is the comparison chart that shows the comparison between Snowpipe set-up with SNS and Snowpipe set-up with SQS.

The comparison between Snowpipe with SQS vs Snowpipe with SNS

Governance of Snowpipe:

Now, that we understood all the details about the Snowpipe AUTO_INGEST methods. It is also important to get some understanding around how do we govern the Snowpipe like :

  • PAUSE the Pipe.
  • RESUME the Pipe.
  • Continuously monitor the Pipe.
  • Drop the Pipe.
  • Track its consumption, etc.

Below are the list of commands along with its explanation to do those activities.

-- To see the number of pipes within an account --
SHOW PIPES;

-- To get the metadata details about the pipe --
DESCRIBE PIPE <pipe_name>;

-- To continuosly monitor the pipe --
SELECT SYSTEM$PIPE_STATUS('<pipe_name>');

-- To pause the pipe --
ALTER PIPE demomysnowpipesqs SET PIPE_EXECUTION_PAUSED = TRUE;

-- To resume the pipe --
ALTER PIPE MYPIPE SET PIPE_EXECUTION_PAUSED = FALSE;

-- To drop the pipe --
DROP PIPE demomysnowpipesqs;


-- Important queries to track its consumption --
SELECT
*
FROM SNOWFLAKE.ACCOUNT_USAGE.PIPES;

SELECT * FROM
SNOWFLAKE.ACCOUNT_USAGE.PIPE_USAGE_HISTORY
ORDER BY START_TIME DESC;

SUMMARY:

I have shared some of the key features insights around Snowpipe and also detailed steps around configuring it. Below is the summary of this amazing feature within Snowflake.

Snowpipe-Summary

Please keep reading my blogs it is only going to encourage me in posting more such content. You can find me on LinkedIn by clicking here and on Medium here. Happy Learning :)

Awarded consecutively as “Data Superhero by Snowflake for year 2024 & 2023”. Links: here

Disclaimer: The views expressed here are mine alone and do not necessarily reflect the view of my current, former, or future employers.

--

--

Somen Swain
Snowflake Builders Blog: Data Engineers, App Developers, AI/ML, & Data Science

Snowflake Data Superhero 2024 & 2023 | 4XSnowpro Certified | AWS Solution Architect Associate | Cloud Computing| Principal-Data Engineering at LTIMindtree