Beer&Diapers.ai
Published in

Beer&Diapers.ai

Snowflake Data ingestion from AWS using Snowpipe

1. Overview

  1. Using Cloud Storage Notification for aws its the AWS S3 object created event or using the AWS SNS (Simple notification service) notifications. This is used for continuous data loading with cloud storage.
  2. Snowpipe REST API by calling insertFiles . This is more of queuing data by calling the REST API .
  1. An External stage references to a storage integration .
  2. Snowflakes associates the integration with a S# IAM user account .
  3. The IAM user is granted access to the S3 bucket which contains the data which needs to be consumed .

2. AWS Configuration

2a AWS Bucket (Create a new or use an existing bucket)

2b Create policy

2c Create a Role

3 Snow Flake Configuration

Create Database & Table

--Create Database
Create Database Snowflake_Demo;
use Database Snowflake_Demo;
use role ACCOUNTADMIN;
--Create TABLE
CREATE OR REPLACE TABLE "SNOWFLAKE_DEMO"."PUBLIC"."EMPLOYEE" (
id STRING,
name STRING
);

Create Integration

--create Integration
create storage integration s3_int
type = external_stage
storage_provider = s3
enabled = true
storage_aws_role_arn = 'arn:aws:iam::0706816153:role/snowpiperole'
storage_allowed_locations = ('s3://snowpipeawsbucket/');
show integrations;desc integration s3_int;

Grant the IAM User Permissions to Access Bucket Objects

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::0706196153:root"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"sts:ExternalId": "0000"
}
}
}
]
}

Create Stage

--Create Stage
Create or Replace STAGE "SNOWFLAKE_DEMO"."PUBLIC"."CUSTOMERSTAGEAWS"
url='s3://snowpipeawsbucket/'
storage_integration = s3_int;
show stages;

Create Pipe

--Create PipeCreate or Replace PIPE "SNOWFLAKE_DEMO"."PUBLIC"."CUSTOMERAWSPIPE"
auto_ingest=true
as
copy into "SNOWFLAKE_DEMO"."PUBLIC"."EMPLOYEE"
from @"SNOWFLAKE_DEMO"."PUBLIC"."CUSTOMERSTAGEAWS"
file_format=(type = 'CSV');
show pipes;

4 Adding Notification event alert in AWS .

6 Test Snowpipe.

--To view contents of stage
ls @"SNOWFLAKE_DEMO"."PUBLIC"."CUSTOMERSTAGEAWS";
--To viw status of Pipe
Select system$pipe_status('SNOWFLAKE_DEMO.PUBLIC.CUSTOMERAWSPIPE');
--Load Hsitoric Data
Alter pipe "SNOWFLAKE_DEMO"."PUBLIC"."CUSTOMERAWSPIPE" refresh
--To see details of any error encountered
select *
from table(information_schema.copy_history(table_name=>'EMPLOYEE', start_time=> dateadd(hours, -1, current_timestamp())));

7 Clean Up

-Drop all objects
Drop Integration s3_int;
DROP STAGE "SNOWFLAKE_DEMO"."PUBLIC"."CUSTOMERSTAGEAWS";
DROP pipe "SNOWFLAKE_DEMO"."PUBLIC"."CUSTOMERAWSPIPE";
DROP Database Snowflake_Demo;

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store