Snowflake Data ingestion from Azure using Snowpipe

Diwakar
Beer&Diapers.ai
Published in
7 min readDec 24, 2021

1. Overview

  1. Data files are loaded in a stage.
  2. A blob storage event message informs Snowpipe via Event Grid that files are ready to load. Snowpipe copies the files into a queue.
  3. A Snowflake-provided virtual warehouse loads data from the queued files into the target table based on parameters defined in the specified pipe.

2. Azure Prerequisites

2a) Create a Resource Group (Use Existing Group if already available)

2b) Create Storage Account

Associate this storage account with an existing Resource Group or use an existing storage account .

2c) Create a Container

From Storage account →Container section Create a Container

This container will be used to stage files wich will be loaded by show pipe

2d) Create a Storage Queue

From Storage account →Queue section Create a Queue

This will be used as the endpoint of the Event subscription

2e) Create a Event Subscription

From Storage account →Events section Create a Event Subscription with below options .

Select Blob Created ,Blob Deleted and Blob Renamed under Event Types .

Select endpoint as “Storage Queue” and then select storage created in step 2b and select queue as created in step 2d

An event subscription is a registration indicating that a particular event is significant to a particular system and specifying the processing to perform when the triggering event occurs.

If we get below error , Go to your Subscription → Resource provider and register EventGrid provider .

3. Information required from Azure

3a) Queue url

From Storage account → Queues — Copy the Queue Url .

3b) Tenant ID

From Azure Active Directory Home page copy the tenant ID

3c) Storage URL

Go to Storage Account→Container and select properties . Copy the url

https://snowflakestoragedd.blob.core.windows.net/snowflakecontainer

replace the https with azure which will be used to create stage

azure://snowflakestoragedd.blob.core.windows.net/snowflakecontainer

3d. Shared Access Signature Key

Go to Storage account → Shared access Signature . Select “container” & “Objects” under “Allowed resource types” , provide an expiration date and click on generate .

Copy “SAS Token” from the keys generated

4 Snowflake Changes

4a) Create Sample Database

--Create Demo DatabaseCreate Database Snowflake_Demo;
use Database Snowflake_Demo;
use role ACCOUNTADMIN;

4b) Create Notification

A notification integration is a Snowflake object that provides an interface between Snowflake and a third-party cloud message queuing service such as Azure Event Grid.

--Create Notification
CREATE NOTIFICATION INTEGRATION Demo_Notification
ENABLED =TRUE
TYPE=QUEUE
NOTIFICATION_PROVIDER=AZURE_STORAGE_QUEUE
AZURE_STORAGE_QUEUE_PRIMARY_URI='<https://snowflakestoragedd.queue.core.windows.net/snowflakequeue>' --QUEUE URL
AZURE_TENANT_ID='<tenantid>'; --Tenant ID
SHOW INTEGRATIONS;desc NOTIFICATION INTEGRATION Demo_Notification;

Run desc notification to see the description of the notification , Click on the AZURE_CONCENT_URL to provide authorization .

Once Accepted this application will show up under Azure → Enterprise applications list .

Once the application shows up Go to Storage account → Access Control (IAM) tab → Role assignments → Add role assignemnt .

Search for Storage Queue Data Conributor role and click on next

add above application name to the users list and then “Review and Assign “

The role should show under role assignment tab

4c) Create Stage

Create an external stage that references your Azure container using the CREATE STAGE command. Snowpipe fetches your data files from the stage and temporarily queues them before loading them into your target table.

---create a Stage CREATE OR REPLACE STAGE SNOWPIPE_STAGE
url = 'azure://snowflakestoragedd.blob.core.windows.net/snowflakecontainer' //Storage URL
credentials = (azure_sas_token=
'?sv=2020-08-04&ss=bfqt&srt=c&sp=rwdlacupitfx&se=2021-12-22T20:33:40Z&st=2021-12-22T12:33:40Z&spr=https&sig=ygGVVtZ36zumDy0BDNAb1EGUyZv5Pj1%2BtT083bxor4M%3D' //SAS key
);
--to see all stages
show stages;
--To view contents of stage
ls @SNOWPIPE_STAGE;

4d) Create Table

--Create TABLE
CREATE OR REPLACE TABLE "SNOWFLAKE_DEMO"."PUBLIC"."EMPLOYEE" (
id STRING,
name STRING
);

4e) Create Snow Pipe

Create a pipe using the CREATE PIPE command. The pipe defines the COPY INTO <table> statement used by Snowpipe to load data from the ingestion queue into the target table.

Snowpipe enables loading data from files as soon as they’re available in a stage. This means you can load data from files in micro-batches, making it available to users within minutes, rather than manually executing COPY statements on a schedule to load larger batches.

--Create Pipe
CREATE OR REPLACE pipe "SNOWFLAKE_DEMO"."PUBLIC"."EMPLOYEE_PIPE"
auto_ingest = true
integration = 'DEMO_NOTIFICATION'
as
copy into "SNOWFLAKE_DEMO"."PUBLIC"."EMPLOYEE"
from @"SNOWPIPE_STAGE"
file_format = (type = 'CSV');

4f) Create a sample CSV file and upload it into the azure container

Create a sample csv files and upload it into the container

From Snowflake run below command to see the list of files

—To view contents of stage
ls @SNOWPIPE_STAGE;

Once the file is uploaded data will show up in the table .

4g) Load Historical Files

To load any backlog of data files that existed in the external stage before Event Grid messages were configured, execute an ALTER PIPE … REFRESH statement.

—Load Historical Files
ALTER PIPE "SNOWFLAKE_DEMO"."PUBLIC"."EMPLOYEE_PIPE" REFRESH;

4h) Troubleshooting and testing

Below query provides an audit of the processing

select *
from table(information_schema.copy_history(table_name=>'EMPLOYEE', start_time=> dateadd(hours, -1, current_timestamp())));

Upload another wait for the data to show up automatically . When new data files are added to the Azure container, the event message informs Snowpipe to load them into the target table defined in the pipe.

4i) Drop all objects

—Drop all objects
drop integration if exists Demo_Notification;
DROP STAGE SNOWPIPE_STAGE;
DROP pipe "EMPLOYEE_PIPE"
DROP Database Snowflake_Demo;

5 Snowpipe Considerations

Recommended Load File Size For the most efficient and cost-effective load experience with Snowpipe, it is recommend following the file sizing recommendations in File Sizing Best Practices and Limitations and staging files once per minute. This approach typically leads to a good balance between cost (i.e. resources spent on Snowpipe queue management and the actual load) and performance (i.e. load latency).

https://docs.snowflake.com/en/user-guide/data-load-considerations-prepare.html#label-snowpipe-file-size

Load Order of Data Files For each pipe object, Snowflake establishes a single queue to sequence data files awaiting loading. As new data files are discovered in a stage, Snowpipe appends them to the queue. However, multiple processes pull files from the queue; and so, while Snowpipe generally loads older files first, there is no guarantee that files are loaded in the same order they are staged.

Data Duplication Snowpipe uses file loading metadata associated with each pipe object to prevent reloading the same files (and duplicating data) in a table. This metadata stores the path (i.e. prefix) and name of each loaded file, and prevents loading files with the same name even if they were later modified .

Estimating Snowpipe Latency Given the number of factors that can differentiate Snowpipe loads, it is very difficult for Snowflake to estimate latency. File formats and sizes, and the complexity of COPY statements (including SELECT statement used for transformations), all impact the amount of time required for a Snowpipe load.

What if we deliver the same file again, but with another name? The result is that the second file will not be loaded for the second time

What if we change something in the file but keep the name the same? The result is that the file is not loaded into Snowflake.

--

--