Snowflake Data ingestion from AWS using Snowpipe

Diwakar
Beer&Diapers.ai
Published in
4 min readJan 22, 2022

1. Overview

Snowpipe is an event based data ingest tool. Snowpipe provides two main methods for triggering a data loading event.

  1. Using Cloud Storage Notification for aws its the AWS S3 object created event or using the AWS SNS (Simple notification service) notifications. This is used for continuous data loading with cloud storage.
  2. Snowpipe REST API by calling insertFiles . This is more of queuing data by calling the REST API .

Below is simplified process of integration with S3

  1. An External stage references to a storage integration .
  2. Snowflakes associates the integration with a S# IAM user account .
  3. The IAM user is granted access to the S3 bucket which contains the data which needs to be consumed .

2. AWS Configuration

2a AWS Bucket (Create a new or use an existing bucket)

2b Create policy

Go to IAM→ Policies and click on “Create policy”

On Next page select “S3” as Service , Under Actions provide “List” and “Read” Permissions

Under Resources Select Bucket and provide Bucket name so that this user only have access to the Bucket created in previous step. For all other resources select any and click next .

On next tab provide the policy name and click on create policy .

2c Create a Role

Go to IAM → Roles and Create Role

Select “Another AWS account” , provide Account ID , Select “Require EXTERNAL ID” and provide 0000 as id .

In next page select the policy already created

Provide Role name and Click on Finish .

Copy the Role ARN for snowflake configuration

3 Snow Flake Configuration

Create Database & Table

--Create Database
Create Database Snowflake_Demo;
use Database Snowflake_Demo;
use role ACCOUNTADMIN;
--Create TABLE
CREATE OR REPLACE TABLE "SNOWFLAKE_DEMO"."PUBLIC"."EMPLOYEE" (
id STRING,
name STRING
);

Create Integration

--create Integration
create storage integration s3_int
type = external_stage
storage_provider = s3
enabled = true
storage_aws_role_arn = 'arn:aws:iam::0706816153:role/snowpiperole'
storage_allowed_locations = ('s3://snowpipeawsbucket/');
show integrations;desc integration s3_int;

Run desc integration s3_int and copy the STORAGE_AWS_IAM_USER_ARN & STORAGE_AWS_EXTERNAL_ID

Grant the IAM User Permissions to Access Bucket Objects

Go to IAM → Roles and select the role created for snowpipe .

Go to “trust relationship “ tab and click on “Edit Trust relationship” which will show a JSON file like below . Update AWS arn with STORAGE_AWS_IAM_USER_ARN and external id with STORAGE_AWS_EXTERNAL_ID noted from desc integration in snowflake

Click on Update Trust policy .

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::0706196153:root"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "0000"
}
}
}
]
}

Create Stage

--Create Stage
Create or Replace STAGE "SNOWFLAKE_DEMO"."PUBLIC"."CUSTOMERSTAGEAWS"
url='s3://snowpipeawsbucket/'
storage_integration = s3_int;
show stages;

Create Pipe

--Create PipeCreate or Replace PIPE "SNOWFLAKE_DEMO"."PUBLIC"."CUSTOMERAWSPIPE"
auto_ingest=true
as
copy into "SNOWFLAKE_DEMO"."PUBLIC"."EMPLOYEE"
from @"SNOWFLAKE_DEMO"."PUBLIC"."CUSTOMERSTAGEAWS"
file_format=(type = 'CSV');
show pipes;

On running show pipes copy the value of notification channel

4 Adding Notification event alert in AWS .

In AWS navigate to your bucket Go to Properties-. Event Notification and select “Create Event notification”

Enter Event name and select “All Object Create Events” under the Object creation .

Under Destination select “SQS queue” as destination and enter the value of notification_channel from showflake pipes description into the ‘SQS queue” field

6 Test Snowpipe.

Create a sample csv file with some sample data and upload that in AWS bucket .

Run below command to check if you are able to list content of AWS bucket .

--To view contents of stage
ls @"SNOWFLAKE_DEMO"."PUBLIC"."CUSTOMERSTAGEAWS";

Below are some useful commands

--To viw status of Pipe
Select system$pipe_status('SNOWFLAKE_DEMO.PUBLIC.CUSTOMERAWSPIPE');
--Load Hsitoric Data
Alter pipe "SNOWFLAKE_DEMO"."PUBLIC"."CUSTOMERAWSPIPE" refresh
--To see details of any error encountered
select *
from table(information_schema.copy_history(table_name=>'EMPLOYEE', start_time=> dateadd(hours, -1, current_timestamp())));

7 Clean Up

-Drop all objects
Drop Integration s3_int;
DROP STAGE "SNOWFLAKE_DEMO"."PUBLIC"."CUSTOMERSTAGEAWS";
DROP pipe "SNOWFLAKE_DEMO"."PUBLIC"."CUSTOMERAWSPIPE";
DROP Database Snowflake_Demo;

--

--