Snowflake Data ingestion from AWS using Snowpipe
Snowpipe is an event based data ingest tool. Snowpipe provides two main methods for triggering a data loading event.
- Using Cloud Storage Notification for aws its the AWS S3 object created event or using the AWS SNS (Simple notification service) notifications. This is used for continuous data loading with cloud storage.
- Snowpipe REST API by calling insertFiles . This is more of queuing data by calling the REST API .
Below is simplified process of integration with S3
- An External stage references to a storage integration .
- Snowflakes associates the integration with a S# IAM user account .
- The IAM user is granted access to the S3 bucket which contains the data which needs to be consumed .
2. AWS Configuration
2a AWS Bucket (Create a new or use an existing bucket)
2b Create policy
Go to IAM→ Policies and click on “Create policy”
On Next page select “S3” as Service , Under Actions provide “List” and “Read” Permissions
Under Resources Select Bucket and provide Bucket name so that this user only have access to the Bucket created in previous step. For all other resources select any and click next .
On next tab provide the policy name and click on create policy .
2c Create a Role
Go to IAM → Roles and Create Role
Select “Another AWS account” , provide Account ID , Select “Require EXTERNAL ID” and provide 0000 as id .
In next page select the policy already created
Provide Role name and Click on Finish .
Copy the Role ARN for snowflake configuration
3 Snow Flake Configuration
Create Database & Table
Create Database Snowflake_Demo;
use Database Snowflake_Demo;
use role ACCOUNTADMIN;--Create TABLE
CREATE OR REPLACE TABLE "SNOWFLAKE_DEMO"."PUBLIC"."EMPLOYEE" (
create storage integration s3_int
type = external_stage
storage_provider = s3
enabled = true
storage_aws_role_arn = 'arn:aws:iam::0706816153:role/snowpiperole'
storage_allowed_locations = ('s3://snowpipeawsbucket/');show integrations;desc integration s3_int;
Run desc integration s3_int and copy the STORAGE_AWS_IAM_USER_ARN & STORAGE_AWS_EXTERNAL_ID
Grant the IAM User Permissions to Access Bucket Objects
Go to IAM → Roles and select the role created for snowpipe .
Go to “trust relationship “ tab and click on “Edit Trust relationship” which will show a JSON file like below . Update AWS arn with STORAGE_AWS_IAM_USER_ARN and external id with STORAGE_AWS_EXTERNAL_ID noted from desc integration in snowflake
Click on Update Trust policy .
Create or Replace STAGE "SNOWFLAKE_DEMO"."PUBLIC"."CUSTOMERSTAGEAWS"
storage_integration = s3_int;show stages;
--Create PipeCreate or Replace PIPE "SNOWFLAKE_DEMO"."PUBLIC"."CUSTOMERAWSPIPE"
copy into "SNOWFLAKE_DEMO"."PUBLIC"."EMPLOYEE"
file_format=(type = 'CSV');show pipes;
On running show pipes copy the value of notification channel
4 Adding Notification event alert in AWS .
In AWS navigate to your bucket Go to Properties-. Event Notification and select “Create Event notification”
Enter Event name and select “All Object Create Events” under the Object creation .
Under Destination select “SQS queue” as destination and enter the value of notification_channel from showflake pipes description into the ‘SQS queue” field
6 Test Snowpipe.
Create a sample csv file with some sample data and upload that in AWS bucket .
Run below command to check if you are able to list content of AWS bucket .
--To view contents of stage
Below are some useful commands
--To viw status of Pipe
--Load Hsitoric Data
Alter pipe "SNOWFLAKE_DEMO"."PUBLIC"."CUSTOMERAWSPIPE" refresh--To see details of any error encountered
from table(information_schema.copy_history(table_name=>'EMPLOYEE', start_time=> dateadd(hours, -1, current_timestamp())));
7 Clean Up
-Drop all objects
Drop Integration s3_int;
DROP STAGE "SNOWFLAKE_DEMO"."PUBLIC"."CUSTOMERSTAGEAWS";
DROP pipe "SNOWFLAKE_DEMO"."PUBLIC"."CUSTOMERAWSPIPE";
DROP Database Snowflake_Demo;