Streaming Use Cases for Snowflake With Kafka

Snowflake is a great platform for many Kafka streaming use cases. You can use the Snowflake Kafka Connector or any Kafka Connector to write files for general streaming. You can also use your own Kafka code utilizing JDBC batch inserts for lowest latency and in-flight transformational flexibility.

This will be the first in a series of articles covering techniques for streaming data into Snowflake. We introduce and discuss three primary approaches:

  1. The Snowflake Kafka Connector.
  2. Snowflake Auto Ingest with Kafka.
  3. Custom Kafka using JDBC batch inserts.

However, we will focus on the 3rd approach in more detail, offering code, and some benchmarking. We will focus on the others in subsequent articles.

Snowflake is well known for batch processing

Snowflake has long been associated with classic batch processing ETL/ELT pipelines, applying its well-known elastically scalable storage and computing cloud infrastructure. Fundamentally, you PUT files into an internal or external stage, then COPY to some kind of table. You use tasks, stored procedures, and streams to process the data as needed in however simple or complex a pipeline you need.

Here is a picture nicely depicting this. Don’t mind that this happens to be AWS-centric. It could also be Azure- or Google Cloud Platform-centric.

But Snowflake also supports many stream processing use cases

Stream processing in contrast to batch processing is characterized by:

  • Lower data volumes at more frequent intervals.
  • Lending itself to partitioning, as indeed is often done with streaming technologies like Kafka.
  • A need to process data in real-time to near real-time latency.
  • Distinguishing event-driven vs micro-batch processing.
  • Typically use a publisher and subscribe model and architecture, like Kafka.

The streaming sweet spot use case for Snowflake

Currently, the sweet spot for streaming use cases in Snowflake is where:

  • Polling intervals by Kafka consumers are 10+ seconds.
  • Each poll produces 1000+ rows of records, ideally even more.
  • Near real-time latency: data being available within at least 20+ seconds.
  • Micro-batch rather than event-driven processing.
  • Incoming data can be processed in sets, rather than as a single atomic event.

Some of this is nuanced. An ideal use case would be where transactions stream into Snowflake to be processed in some kind of rules engine. An email, text, or report type alert might be generated within 20+ seconds.

3 Flavors of Kafka Streaming into Snowflake

a. The Snowflake Kafka Connector

The Snowflake Kafka Connector requires no custom coding. Typically it is configured to map 1 topic to 1 table. The Kafka message is passed to Snowflake in JSON or Avro. It is not parsed and split into multiple columns in a Snowflake table. The target table is assumed to have the following schema:

  • RECORD_CONTENT column:
    - This will contain the Kafka message.
  • RECORD_METADATA column:
    - Stores important information like topic, partition, etc.

The Snowflake Kafka Connector implicitly uses an internal stage and Snowpipe. It will write files into a temporary stage on which a temporary Snowpipe is defined. Consequently, there is a 6–60 second delay from landing files from the Kafka Connector, as Snowpipe is essentially a queue against which a process awakens to COPY new files into tables.

A Snowflake pipeline Kafka connector reference architecture example follows:

Note: the Snowflake Kafka Connector has new functionality and enhancements coming in future releases. These will feature lower end-to-end latency and offer cost savings over the existing implementation. Exactly Once semantics will guarantee no duplicates.

b. Snowflake Auto Ingest with an alternate Kafka Connector (ex: Confluent Storage Cloud Connector)

A simple and efficient way to use Kafka with Snowflake is to write files into cloud storage, for example, an S3 bucket. (Best practice is to create files that are approximately 150MB+ compressed.) And then use a traditional Snowflake ETL/ELT pipeline that does a COPY via Snowpipe from the S3 bucket. Note this approach, since it is using Snowpipe, has a 6–60 second delay before a COPY to the target table commences. A reference architecture for this can be depicted as follows:

c. Custom Kafka using JDBC batch insert statements

It’s common for development teams to write their own Kafka code, often in Java or Scala. There may be a need to structure incoming data and do some reasonable in-flight transformations. There may be a need for faster availability of data than with the Snowflake Kafka Connector. In these cases, using the Snowflake JDBC connector with batch insert statements is the best way to go.

The following diagram illustrates a stream processing pipeline. Note how a JDBC PreparedStatement.executeBatch() does an automatic PUT of an optimal number of files into a Snowflake stage and a insert into a target raw table. Data is immediately committed and available with this call.

A code snippet simulating a Kafka Sink Connector: How to implement a JDBC SQL INSERT Prepared Statement

The following code snippet implements this in a simulated Kafka sink connector. The code should be familiar to anybody who has written a Kafka consumer and worked with JDBC. It is not production code, but implemented for illustration.

It basically does the following, driven by a configuration file:

  • Polls to get a org.apache.kafka.clients.consumer.ConsumerRecords<Long, String> list of the specified RECORDS_PER_POLL size.
  • For each org.apache.kafka.clients.consumer.ConsumerRecord:
    - parse a json into assigned values.
    - Bind a hard coded parameterized SQL INSERT prepared statement with these values.
    - Call addBatch() on the parameterized statement.
  • Flush all the accumulated SQL INSERT statements with an executeBatch() on the parameterized statement per the specified batch size…either in the above loop and after the loop if any accumulated records remain.
    - This is where the PUT and INSERT happen as described in the previous Stream processing pipeline.
  • Returns only when the COPY is successful, so that all the rows can be committed and the Kafka data queues can also be committed.

A small benchmark to demonstrate throughput and latency

The following landing table was used by our Kafka simulation program:

The following configuration file was used to set up this benchmark. The parameter names should be self-explanatory to Snowflake users and, generally, to those familiar with the following Kafka idiom:

  • Parallel Kafka Consumers (Sink Connectors).
  • Getting records within a Kafka Consumer poll().
  • Batch size driving when to sink records within/after a poll().
  • Note: On every poll, our simulator program actually creates records dynamically according to configured records_per_poll. So this consumes some time.

The basic topology of the test was to use an AWS Lightsail VM. The Snowflake account used an XS virtual warehouse. This picture also illustrates how both the Kafka and Snowflake computing instances could be scaled depending on the required throughput and latency.

We specified 10 Kafka Consumers polling every 1000ms 10 times to retrieve and write 200,000 records to Snowflake. A total of 20M records were written in 144 seconds, which comes to an average of 133,888/second of throughput at about 4.5 -7.5 seconds of latency per 200,000 record batch write to Snowflake.

This is more impressive than it first appears because most of the batch write latency was in networking time. However, once files were PUT into Snowflake internal stage, an INSERT of 200,000 records took about 1.5 seconds. This can be illustrated by showing a 200,000 record Kafka sink that looks like the following in the Snowflake GUI:

This is a nice depiction of what takes place on the Snowflake side when executing the following code from our Kafka Consumer:

The following is done automatically on the Snowflake side:

  • Create a temporary internal stage (this is done once for opening a connection).
  • Optimally create as many stage files as needed and PUT them into the internal stage.
  • In this case, 6 files were created.
  • Insert the files.

Note how it took only 1.6s to insert 200k rows of 7.7MB! This is for a single Kafka Consumer payload. Note we tested 10 in parallel, each achieving the same latency and performance. This is pretty compelling, demonstrating how Snowflake can work in a clean, scalable, and schema-driven way to make data available as fast as possible for micro-batches with Kafka. All this with a single XS virtual warehouse…think of the possibilities of scaling many Kafka Consumers doing such JDBC batch inserts using as many elastically scalable virtual warehouses as needed.

The icing on the cake is that all this comes with ACID transaction semantics.

Next steps:

  • Learn more about Snowflake’s Kafka Connector in our documentation here, and read our official announcement here.
  • We will publish a working demo of the Kafka batch insert approach in GitHub soon. Stay tuned!
  • Give it a try and let us know what you think in the comments.

The Snowflake Team Behind this Article

  • Mate Radalj, Principal Sales Engineer
  • Jason Snyder, Senior Solutions Architect
  • Andrew Kim, Senior Sales Engineer
  • Rob Horbelt, Director of Sales Engineering

--

--