Automating Snowpipe with Amazon EventBridge

A more powerful and flexible pipeline for auto-loading data into Snowflake

Snowflake has (rather quietly!) released support for Snowpipe auto-ingest with Amazon EventBridge, in public preview. This option provides an alternative to existing approaches that require configuring a S3 Event Notification.

This blog will dive deep into this new option help you decide whether this approach suits your current Snowflake + AWS architecture, and guide you through the process of setting this up.

Background

Since its release into GA way back in December 2018, Snowpipe provided an mechanism for Data Engineers to set up low-latency event-driven ingestion pipelines from cloud storage services, such as Amazon S3, to Snowflake.

Snowflake has previously offered three main options for automating Snowpipe on AWS:

  1. Route S3 Event Notifications directly to a Snowflake-managed Amazon Simple Queue Service (SQS)
  2. Route S3 Event Notifications to a Snowflake-managed Amazon Simple Queue Service (SQS) via a customer-managed Amazon Simple Notification Service (SNS)
  3. Hand-roll your own automation via the Snowpipe REST API

Challenges with Existing Methods

The first two options remain the most common method and remains to this day a robust solution with minimal set-up and configuration required to get up and running.

However, at larger organisational scales where multiple teams or services require access to the same events, some challenges may pop up, primarily due to limitations with Amazon S3 Event Notifications:

  • Amazon S3 Notifications cannot route events from overlapping prefixes/suffixes. Teams often needed to work around this by introducing a broker service in-between S3 notifications and Snowpipe SQS, typically with services such as SNS or Lambda.
  • PUT notification API performs an atomic operation, which overwrites any existing configuration on a bucket. Therefore, if notifications are deployed by multiple services, each service needed to ensure configuration updates don’t overwrite each other.
  • S3 Notifications are not deployed as a discrete resource by some Infrastructure-as-Code tools such as CloudFormation, and instead handled as a property of the S3 bucket itself. This made automated deployments with CI/CD challenging if the team/org deploys services using these notifications from multiple repositories.

If any of these points resonate with your current team and architecture, read on!

Benefits of Amazon Eventbridge

Amazon Web Services, in the meanwhile, made numerous strides in allowing customers to build powerful event-driven architectures at scale, most notably through the release of Amazon EventBridge in mid-2019, with additional capability to directly send S3 events to it in late 2021.

This new method of handling and processing S3 events provides a number of benefits, including but not limited to:

Naturally, many customers have requested a direct integration between Snowpipe and Eventbridge. In response, Snowflake is now offering support for automation of Snowpipe via S3 events routed via EventBridge while doing away with S3 Event Notifications altogether!

Setup

We will create a simple pipeline with an Amazon S3 bucket with Eventbridge events configured, a Eventbridge rule, SNS topic and Snowpipe. We will then test with a simple data generator that uploads JSON records with randomised data into S3, under the landing/ prefix.

0. Prerequisites

Note: Amazon Eventbridge for S3 events is not part of the free tier. Please see pricing for more details. At $1 per million events triggered it won’t break the bank but it’s not $0 either!

  • A Snowflake account with ACCOUNTADMIN privileges (Consider signing up for a free trial account — highly recommended you select the AWS edition for this exercise)
  • An AWS account with enough privileges.
  • AWS CLI with access keys set up.

This set-up takes multiple steps across a number of services so this will be demonstrated via AWS CLI commands for simplicity.

1. Create S3 bucket with EventBridge Notifications

Note: We recommend you set up your S3 buckets in the same region as your Snowflake account.

We can create a S3 bucket and enabled S3 EventBridge Notifications from the CLI. This can alternatively be easily done via the Management Console, with all other settings left to default.

aws s3api create-bucket \
--bucket <your-bucket-name> \
--region <your-region> \
--create-bucket-configuration LocationConstraint=<your-region>

aws s3api put-bucket-notification-configuration \
--bucket <your-bucket-name> \
--notification-configuration='{ "EventBridgeConfiguration": {} }'

2. Set up secure access between Amazon S3 and Snowflake

I’d recommend following the excellent step-by-step official documentation on setting up secure access between AWS and Snowflake via a Storage Integration.

After following the guide, you should have the following set up which you will need for later steps:

  • IAM Role with an IAM policy granting access to your S3 bucket, with trust relationship set-up to allow the Snowflake IAM User to assume the role
  • A Snowflake STORAGE INTEGRATION configured for the S3 path and IAM Role

3. Set up Simple Notification Service

We will now create a Simple Notification Service (SNS) topic where Eventbridge sends events to.

aws sns create-topic --name eventbridge-to-snowpipe-topic

Snowpipe subscribes to the SNS topic directly. This means we need to grant permissions for Snowflake IAM User to subscribe to this topic. Snowflake provides a function that generates the IAM policy for you:

select system$get_aws_sns_iam_policy('arn:aws:sns:<your-aws-region>:<your-account-no>:eventbridge-to-snowpipe-topic');

Which returns the following:

{
"Version" : "2012-10-17",
"Statement" : [
{
"Sid" : "1",
"Effect" : "Allow",
"Principal" : {
"AWS" : "arn:aws:iam::<snowflake-aws-account-no>:user/<snowflake-aws-iam-username>"
},
"Action" : ["sns:Subscribe"],
"Resource" : ["arn:aws:sns:<your-aws-region>:<your-account-no>:eventbridge-to-snowpipe-topic"]
}
]
}

We then merge this into the existing SNS resource policy as an additional element under the "Statement" list. This can be done easily from the Management Console, or via the CLI:

aws sns set-topic-attributes \
--topic-arn arn:aws:sns:<your-aws-region>:<your-account-no>:eventbridge-to-snowpipe-topic \
--attribute-name Policy \
--attribute-value '{
"Version": "2008-10-17",
"Id": "__default_policy_ID",
"Statement": [
{
"Sid": "__default_statement_ID",
"Effect": "Allow",
"Principal": { "AWS": "*" },
"Action": [
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:AddPermission",
"sns:RemovePermission",
"sns:DeleteTopic",
"sns:Subscribe",
"sns:ListSubscriptionsByTopic",
"sns:Publish"
],
"Resource": "arn:aws:sns:<your-aws-region>:<your-account-no>:eventbridge-to-snowpipe-topic",
"Condition": {
"StringEquals": {
"AWS:SourceOwner": "<your-account-no>"
}
}
},
{
"Sid" : "1",
"Effect" : "Allow",
"Principal" : {
"AWS" : "arn:aws:iam::<snowflake-aws-account-no>:user/<snowflake-aws-iam-username>"
},
"Action" : ["sns:Subscribe"],
"Resource" : ["arn:aws:sns:<your-aws-region>:<your-account-no>:eventbridge-to-snowpipe-topic"]
}
]
}'

4. Set up Eventbridge Rule & Target

All AWS-generated events are routed to the default Event Bus, which should already exist in your AWS account. Therefore, we only need to create an EventBridge Rule & Target that filters on S3 events from our new bucket and sends to the SNS topic we’ve just created:

aws events put-rule --name eventbridge-to-sns-rule \
--event-pattern '{
"source": [ "aws.s3" ],
"detail-type": [ "Object Created" ],
"region": [ "<your-aws-region>" ],
"resources": [ "arn:aws:s3:::<your-buckets-name>" ],
"detail": {
"bucket": { "name": [ "<your-buckets-name>" ] },
"object": { "key": [ { "prefix": "landing/" } ] }
}
}' \
--state ENABLED \
--description "EventBridge rule for S3 object creation"

aws events put-targets \
--rule eventbridge-to-sns-rule \
--targets "Id"="1","Arn"="arn:aws:sns:<your-aws-region>:<your-account-no>:eventbridge-to-snowpipe-topic"

EventBridge Rules support up to five targets per rule. This means you can also leverage the same rule to push the same event to other services you’d like, which could simplify your overall event-driven architecture on AWS.

5. Set up Snowflake

Now let’s create the landing zone in Snowflake where the data will be ingested into, including the Snowpipe and landing table:

use role ACCOUNTADMIN;

--Create DB & Schema
create or replace database SNOWPIPE_EVENTBRIDGE_DEMO;
create or replace schema SNOWPIPE_EVENTBRIDGE_DEMO.LANDING;

use database SNOWPIPE_EVENTBRIDGE_DEMO;
use schema LANDING;

--Create stage object
--You should already have a STORAGE INTEGRATION created from Step 2.
create or replace stage eventbridge_s3_stage
url = 's3://<your-s3-bucket-name>/landing/'
storage_integration = <your-storage-integration-name>

--Landing table
create or replace table snowpipe_eventbridge_landing (
record variant
);

--Snowpipe
create or replace pipe eventbridge_pipe
auto_ingest = true
aws_sns_topic = 'arn:aws:sns:<your-aws-region>:<your-account-no>:eventbridge-to-snowpipe-topic'
as
copy into snowpipe_eventbridge_landing
from @eventbridge_s3_stage
file_format = ( type = 'JSON' );

6. Load data into S3

Now with the pipeline set up, we can try loading some data into S3. Here’s a simple bash script that generates a JSON record every 15 seconds and uploads to S3:

#!/bin/bash

# S3 bucket information
BUCKET_NAME="<your-bucket-name>"
REGION="<your-aws-region>"

# Loop forever
while true; do
# Generate a random JSON record
json_record=$(jq -n '{id: $id, name: $name, age: $age}' --arg id $(uuidgen) --arg name $(shuf -n 1 names.txt) --arg age $(shuf -i 18-60 -n 1))

# Upload the record to S3
aws s3 cp - s3://${BUCKET_NAME}/landing/$(date +%Y-%m-%d-%H-%M-%S).json --region ${REGION} <<< "${json_record}"

# Wait 15 seconds before generating the next record
sleep 15
done

7. Check that data is landing

Whenever new data is loaded into S3, the following sequence should trigger automatically:

  • S3 sends a PutObject event to the default Eventbridge Event Bus
  • The Eventbridge Rule is invoked, routing the event to the SNS topic
  • SNS topic then pushes the event payload to a Snowflake-managed SQS queue, which informs Snowpipe of the arrival of the new data in S3
  • Snowpipe copies files into a queue, from which they are loaded into the landing table SNOWPIPE_EVENTBRIDGE_LANDING.

Within a minute or so of starting the bash script, you should be able to see data landing in the SNOWPIPE_EVENTBRIDGE_LANDING table.

Troubleshooting

On Snowflake side, you can query either the COPY_HISTORY information schema view or call the SYSTEM$PIPE_STATUS() function:

--Files loaded into the table in the past hour
select *
from table(
information_schema.copy_history(
TABLE_NAME=>'SNOWPIPE_EVENTBRIDGE_LANDING',
START_TIME=> DATEADD(hours, -1, CURRENT_TIMESTAMP())
)
)
order by last_load_time desc;

--Returns current status of the pipe
select SYSTEM$PIPE_STATUS('EVENTBRIDGE_PIPE');

You can also check permissions between S3 and Snowflake. One way I like to use to check for any 403 errors is to query the stage directly: select $1 from @eventbridge_s3_stage

On the AWS side, here is a list of common things you can look at if data isn’t flowing into Snowflake, and nothing is reported when checking Snowflake’s COPY HISTORY view:

  • From Cloudwatch, you can check the number of invocations of the Eventbridge Rule. If there aren’t any, check that the event pattern associated with the rule is correctly formed and defined (S3 prefix shenanigans is a common one!)
  • Ensure that Snowflake has permissions to subscribe to the SNS topic
  • Ensure that the Eventbridge rule has permissions to publish to the SNS topic. Errors here commonly occur if you overwrite or erase the existing SNS topic resource policy.

Conclusion

Integration of Amazon Eventbridge with Snowpipe enables a more powerful and flexible method of automating loading of data from Amazon S3 into Snowflake. With flexible event routing, deployment and integrations with other services, you can easily integrate Snowflake ingestion pipelines into a cost-effective event-driven architecture.

--

--