Seamless Data Pipeline: Integrating Kinesis Streams with Snowflake for Real-Time Analytics
Amazon Kinesis, a managed AWS service, is one of the scalable technologies that enables ingesting large volumes of data in real-time, allowing organizations to process and analyze data as it arrives. Snowflake Data Cloud, which is also a managed service, works very closely with Amazon to enable customers to take advantage of both platforms, by integrating various AWS services with Snowflake. Snowflake Data Cloud has one of the largest partner ecosystems that offer integrations with various products and technologies, and integration with Kinesis is one of them.
Many of Snowflake’s customers use Kinesis Data Stream as a streaming platform for near real-time data ingestion, such as clickstream, telemetry data, etc from upstream applications sources. This data needs to be processed in Snowflake for real-time analytics. Until now, data engineers used Kinesis Firehose to sink data in blob storage (s3) and from there load data into Snowflake using either Snowpipe or in batch. As a result, data pipelines became complex, duplicate storage (expensive), and had more latency to process near real-time data.
To overcome the above challenges, the Snowflake Partner team works with Amazon to use integration with Kinesis using Snowpipe Streaming to reduce the cost, reduce latency, and make data pipeline architecture simple. This integration is “Easy to use” which is one of the reasons why Snowflake is most popular in the world of data.
Let’s explore how to set up such pipeline using Kinesis. First, Go to the AWS console page, then go to Kinesis, there on the data stream subtab, click on the name of the data stream you want to use for streaming in Snowflake, and then click on the “Process with the delivery stream” button, which will take you to setup Firehose.
The following screen shows the configuration of the new delivery system using Snowflake. So your source is Kinesis Data Stream and your destination is “Snowflake” as seen below:
Now you have to give credentials for Snowflake, i.e. Account URL, Username, Private key, Snowflake role you want to use (make sure role is assigned to the user), database, schema, and table name. Please note that at this point you must create the table using Snowflake Snowsight. Secondly, make sure the role you use here, has access to the database/schema/table you use. Later, enhancement is coming where the table will be automatically created if not exist. If you have a private link setup, it also supports that.
If you want to create a Snowflake user with RSA key, please follow the instructions in this link https://docs.snowflake.com/en/user-guide/key-pair-auth#configuring-key-pair-authenticatio
Use this website to convert the private key into a single line, which is needed to enter in this screen setup.
For the source setting make sure the Kinesis Data Stram ARN is correct. For the Destination setting see below:
Then provide other settings such as backup bucket, buffer size, buffer interval, etc. That is all you need to do and your data from Amazon Kinesis Data Stream will be ingested in the Snowflake table, where further analytics and machine learning can be done.
If you want to test your integration use the following sample Snowflake script for DDLs:
use role accountadmin;
create database if not exists kinesis_db;
create schema if not exists kinesis_schema;
-- Create a Snowflake role with the privileges to work with the Kinesis.
create role if not exists kinesis_stream_role;
grant role kinesis_stream_role to role sysadmin;
grant usage on warehouse kinesis_wh to role kinesis_stream_role;
-- Grant privileges on the database.
grant usage on database kinesis_db to role kinesis_stream_role;
-- Grant privileges on the schema.
grant usage on schema kinesis_schema to role kinesis_stream_role;
grant usage on schema public to role kinesis_stream_role;
grant all on future tables in schema kinesis_db.kinesis_schema to role kinesis_stream_role;
grant all on future streams in schema kinesis_db.public to role kinesis_stream_role;
-- assume you have created user JSMIT using instruction in above link
grant role kinesis_stream_role to user JSMITH ;
alter user JSMITH set default_role=kinesis_stream_role;
use role kinesis_stream_role;
use kinesis_db.kinesis_schema
CREATE OR REPLACE TABLE yellow_taxi_fh (
VendorID number,
tpep_pickup_datetime varchar ,
tpep_dropoff_datetime varchar ,
passenger_count number,
trip_distance FLOAT,
RatecodeID number,
store_and_fwd_flag TEXT,
PULocationID number,
DOLocationID number,
payment_type number,
fare_amount FLOAT,
extra FLOAT,
mta_tax FLOAT,
tip_amount FLOAT,
tolls_amount FLOAT,
improvement_surcharge FLOAT,
total_amount FLOAT,
congestion_surcharge FLOAT
);
Here is the sample data file that you can download and use it to read in Python to post messages into the Kinesis stream. The following Python code can be used to test streaming data into Kinesis Data Stream from your computer:
import json
import csv
import boto3
taxiCabRides = []
with open('yellow_tripdata.csv', encoding='utf-8') as csvf:
csvReader = csv.DictReader(csvf)
for rows in csvReader:
taxiCabRides.append(rows)
client = boto3.client('kinesis')
counter = 0
for ride in taxiCabRides:
response = client.put_record(
StreamName = "upatel-stream",
Data = json.dumps(ride),
PartitionKey = str(hash(ride['tpep_pickup_datetime']))
)
counter = counter + 1
print('Message sent #' + str(counter))
# If the message was not sucssfully sent print an error message
if response['ResponseMetadata']['HTTPStatusCode'] != 200:
print('Error!')
print(response)
# Install AWS CLI -
# https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html
# Configure AWS CLI - Run 'aws configure'
# Set up python environment - Run 'pip install boto3'
# run above program
# make sure data file downloaded in same directory as your python code.
Once you create the delivery stream in AWS, within a minute you will see data flowing in Snowflake. With only configuration, you can implement a data pipeline for near real-time analytics and machine learning use cases.
Summary:
Snowflake works very closely with cloud service providers to make integration easier in fact zero code! Not only that, this integration also helps you to reduce data ingestion latency and reduce the overall cost of doing near real-time analytics in Snowflake.
All the best for your real-time insight projects!
Disclaimer: The opinions expressed in this post are my own and not necessarily those of my employer (Snowflake).