Streaming on Snowflake

Photo by Les Anderson on Unsplash

Disclaimer: I am Senior Solution Architect at Snowflake with 16 years of data strategy, architecture, and engineering experience. The views expressed here are mine alone and do not necessarily reflect the views of my current, former, or future employers. The source code used in this post can be found on GitHub.

Time to insight

Organizations' holy grail for analytics is accessing data for real-time insights. This continues to challenge data teams with architectural decisions and legacy architectures that can't scale to meet the pace of data coming at them. Cloud unlocks these challenges due to the nearly limitless potential of storage and compute. But even with the cloud, it can be complex to architect solutions given the patchwork of technologies that are needed to support data streaming architectures — networking, data security, governance, and the pipes and connectors between the various upstream and downstream systems.

Snowflake Stream Processing

Snowflake makes connecting to streaming technologies as simple as any other Snowflake feature, abstracting away much of the complexity so developers can focus on delivering value from data, not spending time implementing and maintaining complex architectures. Snowflake provides a few different options for streaming data in near real-time. Let's review the architecture and intended uses below.

Snowflake Streaming Architecture Patterns
  1. Snowpipe — This solution provides Cloud storage (e.g., S3, Azure Blob Storage ) the ability for serverless alerting Snowflake to auto-ingest data upon arrival. Once a file lands, Snowflake is alerted to pick up and process the file. Typically Snowpipe is used for micro-batch file transfer, not real-time message ingestion, but since it's used natively in the Kafka connector, I wanted to have a baseline comparison.
  2. Kafka Connector — This connector provides a simple yet elegant solution to connect Kafka topics with Snowflake, abstracting the complexity through Snowpipe. The Kafka topics write the data to a Snowflake-managed internal stage, which is auto-ingested to the table using Snowpipe. The internal stage and pipe objects are created automatically as part of the process.
  3. Kafka with Snowpipe Streaming (Private Preview) — This builds upon the first two approaches and allows for a more native connection between Snowflake and Kafka through a new channel object. This object seamlessly streams message data into Snowflake without needing first to store the data. The data is also stored in an optimized format to support the low-latency data interval. This method uses Snowpipe Streaming API, which is also in Private Preview. Note: The Snowpipe Streaming API opens up data streaming to any Java-based application.

Technical Deep Dive

Let's see how to configure and utilize these features in Snowflake and compare the results. And if the technical specs of deriving these metrics don't get you excited, jump down to "The Results" section.

But first, some background on the business challenge.

A large retailer works with multiple vendors and needs to process and aggregate transactions from various Point of Sale (POS) systems. The current architecture is aging in two ways. First, the storage volumes can only keep 12 months of history due to the large volume of data. Second, the time it takes to process and aggregate this volume of data is timely. It takes a long time for the batches of data to be brought into the environment, and once it arrives, it takes a long time to re-process with the 12 months of historical data.

To address this challenge, we will be using Snowflake's Data Cloud to stream and analyze the data in three parts:

  • Snowflake Objects — We need a Snowflake account to create tables, integrations, and snowpipe objects.
  • Infrastructure Setup — we need an S3 bucket to store our data and a Kafka Cluster to stream in data. We will use AWS S3 for storage and a Confluent docker container to represent our Kafka cluster.
  • Data Creation — Once we have the above, we will generate synthetic data using a Python script.

Snowflake Objects

For each pattern, we will create a unique table. In addition, we need to manually create an external stage and PIPE object for pattern #1.

CREATE DATABASE IF NOT EXISTS SNOW_SANDBOX;
CREATE SCHEMA IF NOT EXISTS STREAMING;
USE SNOW_SANDBOX.STREAMING;
--create table to test snowpipe data loads
CREATE TABLE PRODUCT_SNOWPIPE (PRODUCT_JSON VARIANT);
--create table to test kafka connector loads
CREATE TABLE PRODUCT_KAFKA_CONNECT (RECORD_METADATA VARIANT, RECORD_CONTENT VARIANT);
--create table to test kafka connector with streaming
CREATE TABLE PRODUCT_KAFKA_STREAMING (RECORD_METADATA VARIANT, RECORD_CONTENT VARIANT);
--create vendor lookup table
CREATE TABLE VENDORS
(VENDOR_ID NUMBER,
VENDOR_NAME VARCHAR);
--populate with vendors
INSERT INTO VENDORS VALUES
(1, 'Home Improvement USA'),
(2, 'Big Box Ltd' ),
(3, 'Electronic Center' ),
(4, 'Organic Market' ),
(5, 'Furniture Depot' ),
(6, 'Music Streaming Unlimited');
--create a stage to an S3 bucket where we will pick up data from snowpipe
create stage SNOW_SANDBOX.STREAMING.PRODUCT_DATA_S3
url='<path to your bucket>'
storage_integration = <your storage integration>;

--create a Snowpipe Pipe to load data via SNS topic
--this will automatically move files from the stage locaiton to a snowflake
--table (PRODUCT_SNOWPIPE) using serverless features.
create or replace pipe PRODUCT_PIPE auto_ingest=true as
copy into SNOW_SANDBOX.STREAMING.PRODUCT_SNOWPIPE
from @PRODUCT_DATA_S3/pipe_data
file_format = (type = 'JSON');
--once created get the value from "notification_channel" to use in SNS queue of your S3 bucket in the Infrastructure Setup instructions--In AWS S3 bucket Properties:
-- |_Create event notification
-- |_Specify SQS queue
-- |_Choose from your SQS queues
-- |_Enter SQS queue ARN
DESC PRODUCT_PIPE;

Kafka Connector Requires Private KeyPair authentication. You can set this up in your environment by following these instructions. Once you have your Public and Private Keys, you need to add them to your Snowflake account and the Kafka Connector Configuration, respectively. Basic Steps Below:

#generate unencryped Private Key --> rsa_key.p8
openssl genrsa 2048 | openssl pkcs8 -topk8 -inform PEM -out rsa_key.p8 -nocrypt
#generate Public Key from Private Key --> rsa_key.pub
openssl rsa -in rsa_key.p8 -pubout -out rsa_key.pub
#add the Public key to your Snowflake username. Be sure not to include linebreaks or whitespaces
alter user MYUSER set rsa_public_key='MIIBIjANBgkqh...';

Infrastructure Setup

  • Python 3.7+ installed locally
  • AWS S3 with Event Notification (This could alternatively be configured with Azure Blob Storage or GCP Cloud Storage) — A pre-requisite is having an S3 bucket configured with Snowflake through a Storage Integration. Complete Snowpipe Config Instructions, or you can follow here to configure your bucket for event notification under your S3 Bucket properties > Create Event Notification. Be sure to select "All Object create events." The SQS queue name will come once you create your PIPE object using DESC PIPE PRODUCT_PIPE.
1. Download and install docker 2. git clone https://github.com/confluentinc/cp-all-in-one.git3. cd cp-all-in-one/cp-all-in-one4. git checkout 7.2.1-post5. docker-compose up -d6. wait a few min and navigate to http://localhost:9021 

Once up and running, we need to now configure Kafka to use the Snowflake Kafka Connector v. 1.8.1.

  • Step 1. Modify the infa_setup/docker_compose.ymlconfiguration file for using Snowflake Kafka Connector. I have added my configuration file in git for reference. This file can be used instead of the default provided by the cp-all-in-one image.
  • Step 2. Install JAR files — Installation instructions here, and you will need to download three JAR files and put them in your docker directory cp-all-in-one/cp-all-in-one/jars/. They are also in infa_setup/jars/*
  • Step 3: Create a Kafka Topic in the "Topics" tab within the Confluent Control Center. I've called mine "product_stream."
  • Step 4: Configure the Kafka Connector to talk with Snowflake. Here is a sample config file to be added to the Confluent Control Center, "Connect" tab. This file is also located in infa_setup/snowflake_sink_connector.json Just click "Upload connector config file."

The great thing about the Kafka with Snowpipe Streaming is if you're already running Kafka Connector, it's a one-line change to the config. Note that this change requires v1.8.1 of the Snowflake Kafka connect JAR.

snowflake.ingestion.method Required only if using the Kafka connector as the streaming ingest client. Specifies whether to use Snowpipe Streaming or standard Snowpipe to load your Kafka topic data. The supported values are as follows: SNOWPIPE_STREAMINGor SNOWPIPE (default)

Data Creation

We use two python scripts to generate data. One pushes data directly to an S3 bucket for pattern #1, while the other uses the Kafka Producer library from Confluent_Kafka for patterns #2 and 3. The scripts mimic creating 10 messages a second for 15min, or 9,000 messages. These parameters can be altered in the Python script to mimic your exact use case better.

First, we need to create a small kafka_config file that points to our server. If you used username or password credentials, you would also add them here. If you are using the basic setup, you need to include the location and port of your Kafka broker (9092 is the default).

[default]
bootstrap.servers=localhost:9092
# the topic if no committed offsets exist.
auto.offset.reset=earliest
#session.timeout.ms=45000

The script to push data to S3 for pattern #1 requires CLI access to S3 and the boto3 python library. You can set up AWS CLI access by following these instructions. The boto3 library will make use of the CLI credentials. To run this file, just run: python streaming_s3_datagen.py

And for publishing data to your Kafka Topic, you can use this script. You will need to provide two arguments to the file [configuration file from above] and [topic_name], such as python streaming_kafka_datagen.py confluent_python.config product This will start sending messages to the "product" topic and then to the Kafka Connect Snowflake Connector. I created two topics - one for "product" and one for "product_streaming," so I ran this script twice, once per topic:table mapping.

The Results

Statistics from each pattern are listed below. Each run generated 9000 messages to simulate 15 min x 10 messages a second.

Load Stats from the various Snowflake Streaming Patterns

My observations:

  • Snowpipe — The overall data generation time was significantly higher than the 15min it should have run for. This seems to be from the overhead transferring 9000 individual small ~200k files to S3. Despite not being the recommended way to move message data into Snowflake, Snowpipe did well in moving a large number of files into the table within a minute or less.
  • Kafka Connector — The averages performed significantly better (50%) than regular Snowpipe. I surmise this has to do with two things 1) The connector has properties to batch rows, in this case, at a max of 10,000 messages or 5000000 bytes (5 MB). In this example, 188 micro-batch files were generated. This can be tailored to your specific use case, so Snowpipe receives larger or smaller micro-batches of messages. 2) The Topic and Connector were set to use two partitions which automatically created two pipe objects for data loading. This was 2x more than we have with Snowpipe alone and can also be configured higher if needed.
  • Snowpipe Streaming — Even in Private Preview, this pattern seems to be the best performant since no data is written to the internal stage before shipping the batch of messages to Snowflake. The new channel object is efficient in streaming data into Snowflake within seconds, and I'm sure the stability and consistency will only improve once it goes generally available (GA). For consistency, I also used two partitions, resulting in the creation of two-channel objects. You can view them using SHOW CHANNELS; Note: There are currently no load stats available for Snowpipe streaming, so the codebase contains a custom procedure to log a record every second to detect inserts. The numbers above should serve as a good baseline but did not come from Snowflake native log tables, but rather a custom proc.

Scripts to reproduce these results can be found on GitHub under results\ In addition, with I've created a table with 1 billow rows, simulating 3 years of historical data. (~315M messages a year).

--create an XL warehouse to gen 1B records
CREATE WAREHOUSE COMPUTE_XL_WH WAREHOUSE_SIZE ='X-Large';
USE WAREHOUSE COMPUTE_XL_WH;
--create a history table with 1B records
CREATE TABLE PRODUCT_HIST AS
SELECT
LPAD(seq4()+1, 9, '0') TRANSACTION_ID,
dateadd(day, '-' || seq4(), current_timestamp()) as transaction_date,
uniform(1, 6, random(15)) as VENDOR_ID,
uniform(10000000, 99999999, random(21)) as Product_id,
uniform(9, 2000, random(22)) as Product_price,
uniform(1, 20, random(23)) as quantity,
randstr(uniform(4,10,random(2)),uniform(1,10000,random(2)))::varchar(10) as product_name,
randstr(uniform(6,20,random(4)),uniform(1,200000,random(4)))::varchar(20) as product_desc
from table(generator(rowcount=>1000000000));
--turn off the XL warehouse
ALTER WAREHOUSE COMPUTE_XL_WH SUSPEND;

With this historical table, you can easily union it with your incoming streaming data for real-time predictions, trending, and analysis.

Conclusion

Snowflake continues to provide additional value to the platform without customers having to deal with complicated integrations or infrastructure changes. In the example above, Snowflake's native Snowpipe Streaming for Kafka connector (Private Preview) only requires a single line change from "SNOWPIPE" to "SNOWPIPE_STREAMING." This small change reduces the time for data to be viewed in near-real time, supporting real-time analytical insights. In addition, Snowflake's native elastic storage and compute provide the scale needed to combine near real-time data with legacy data for historical trending or predictive analytics on a single platform.

Because Snowflake pushes automatic updates, customers receive these and other newly released features, providing additional value through a future-proof architecture. If you want to try out Snowpipe Streaming, please get in touch with your Snowflake account team.

--

--