Streaming Data from Snowflake to Kafka

Wadi Showka Dam in just one rainfall!

️❄️ I got the news of my selection as Snowflake Data Superhero 2023! Thank you!❄️

In one of my previous blogs, I covered step-by-step how to move data from Kafka to Snowflake.

From the previous blog

But how about a reverse pattern? Moving data from Snowflake to Kafka. There can be a number of use cases for this, but the overarching one could be propagating Snowflake table (change) data to the downstream systems.

By the nature of Apache Kafka, it is purpose-built for a performant pub-sub architecture. This means if you have data from Snowflake that needs to be streamed to multiple sinks, you can have Kafka do this routing for you.

Surprisingly, there are not many ‘how to’ for this pattern. A few I stumbled upon (1, 2) either involved a cloud storage hop or an EL tool. In fact, I saw this StackOverflow question from 2020 with no answer yet!

I took it as a challenge, and ended up achieving this directly via JDBC! This is the story of ̶m̶y̶ ̶l̶i̶f̶e̶ my weekend! ❄️❄️❄️❄️

vaary naaice

What are we going to achieve?

We want to create a flow that streams changes in a Snowflake table/s to a Kafka Topic/s. This Kafka topic/s can then be consumed by downstream applications.

The below guide will be step-by-step on how you can achieve this. There is an assumption of knowledge of Snowflake and Kafka. We will concentrate on how to achieve the pattern. This means we will skip some concepts that are not core (docker, wsl2, etc) for this tutorial, but you can easily google them. The objective is to get this setup up and running!

Some Kafka terminology:

  1. Kafka Source: It is a system that can push (or Kafka can pull) data.
  2. Kafka Sink: It is the target system where Kafka feeds the data.
  3. Kafka Connect: Free, open-source component of Apache Kafka® that works as a centralized data hub for simple data integration between databases.
  4. Kafka Topic: A log of events. It can be a multi-publisher and multi-subscriber.

High-Level Steps

  1. Create and set up a Snowflake Instance.
  2. Create a Kafka Environment (this will involve setting the Kafka, zookeeper, kafka-connect, etc.)
  3. Setting up Confluent Kafka connect JBDC Source connector.
  4. Installing Snowflake JDBC driver.
  5. Run the Kafka Connector and do a sanity check.
  6. Final step: to see if change data is getting captured!

Setting up Snowflake Account

You will need an account on Snowflake. The trial account works. You can sign-up for one here https://signup.snowflake.com/ (no credit card is required, but always keep a tap on how much Snowflake credit you are spending).

Once you have an account, using appropriate roles and permission, execute the below statement on Snowsight.


--Setup the context for the database and schema
use database demo;
use schema public;

--Create a small table
create or replace table "snowflake2kafka"
(
"id" integer not null,
"created_at" timestamp not null,
"updated_at" timestamp not null,
"first_name" varchar(255),
"address" varchar(255)
);

--Add some data to this table

INSERT INTO "snowflake2kafka" ("id", "created_at", "updated_at", "first_name", "address") VALUES (11, '2023-01-28 10:00:00.400121', '2023-01-28 11:00:19.400121', 'Sudhendu', 'Somewhere');
INSERT INTO "snowflake2kafka" ("id", "created_at", "updated_at", "first_name", "address") VALUES (12, '2023-01-29 10:00:00.400121', '2023-01-29 11:00:19.400121', 'Prakash', 'Mumbai');

--Verify your data
select * from "snowflake2kafka";

Setting up Kafka

There are multiple ways you can set up Kafka. You can either go with Apache Kafka (which I used in my last blog). Or you can go with the Confluent Kafka offering. There are docker images that are available readymade!

For this blog, we are using landoop/fast-data-dev. This is a Kafka distribution with Apache Kafka, Kafka Connect, Zookeeper, Confluent Schema Registry, and REST Proxy. And it’s amazing!

1 . Install Docker Desktop on your machine. Since I was on windows, this was just a quick setup for me.

2. Once you have the docker desktop installed, open the command prompt and run:

docker pull landoop/fast-data-dev

You should see something like below in the images.

3. Update the docker-compose file. The easiest way to do this is it click ‘Run’ in the images which will open a configuration box.

4. In the optional setting, specify the following values (names and folder path need not be the same)

snowflake-to-kafka-cluster:
container_name: snowflake-stream-kafka
image: landoop/fast-data-dev
environment:
ADV_HOST: 127.0.0.1
RUNTESTS: 0
ports:
- 2181:2181
- 3030:3030
- 8081-8083:8081-8083
- 9581-9585:9581-9585
- 9092:9092
volumes:
# This can be any path in your system. Be sure to specify absolute
- C:\Users\pandey\Documents\Documents\Projects\snowflake-kafka:/my-data

5. Once you add all of this and things are good, you should see your container starting.

Tip: You can click on the container and view the logs.

6. Since the landoop image comes with an amazing UI over the Kafka, head over to localhost:3030 (provided you did not change the port number). A beautiful interface awaits you

Setting up Confluent Kafka connect JBDC Source connector

We will now install Kafka Connect (JDBC Source Connector). This is a generic JDBC source connector we are going to use. As of this writing, there is no specific Snowflake Source connector available (as opposed to Snowflake Sink, which is available here). Good for us, generic JDBC driver works for Snowflake connectivity.

7. Click on KAFKA CONNECT UI in the lenses screen as shown below.

8. Click on New Connector and then select JDBC Source Connector.

io.confluent.connect.jdbc.JdbcSourceConnector

9. In the properties tab below, add the following properties.

The properties files should be specific to your Snowflake setup.

  1. You can follow Snowflake guidelines for adding the JDBC URL.
  2. Snowflake is a little picky about cases. Please follow the upper or lower case with diligence.
  3. Do not change the connector class. It has to be io.confluent.connect.jdbc.JdbcSourceConnector
  4. Don’t forget to specify the role name in the JDBC URL.
  5. All the below properties are discussed well in detail here.
name=snowflake.to.kafka.connector
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
timestamp.column.name=updated_at
incrementing.column.name=id
transforms.createKey.type=org.apache.kafka.connect.transforms.ValueToKey
connection.password=PUTYOURPASS
query=select * from "snowflake2kafka"
connection.attempts=100
transforms=AddNamespace,createKey,AddKeyNamespace
connection.backoff.ms=300000
transforms.AddNamespace.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Value
timestamp.delay.interval.ms=3000
table.types=table
mode=incrementing
topic.prefix=snowflake-kafka-etl
transforms.AddKeyNamespace.type=org.apache.kafka.connect.transforms.SetSchemaMetadata$Key
connection.user=yoursnowflakeusername
transforms.AddNamespace.schema.name=inc.evil.coursecatalog.InstructorAggregate
transforms.createKey.fields=id
poll.interval.ms=1500
transforms.AddKeyNamespace.schema.name=inc.evil.coursecatalog.Key
numeric.mapping=best_fit
connection.url=jdbc:snowflake://ACCOUNT.snowflakecomputing.com/?warehouse=warehousename&db=test&schema=public&role=WHATEVEROLE

10. If you start the connector now, you will get an error that says

kafka java.sql.SQLException: No suitable driver found for jdbc:snowflake://...

11. We need to add the Snowflake JDBC driver to the library path of Kafka Connect. For that, you have to download the Snowflake JDBC driver from here. Feel free to download the latest version.

12. Find the \kafka-connect-jdbc\ for your Kafka setup (this was the most time consuming part for me). The easiest way to do it for Docker-WSL2 is to look for it in \\wsl.localhost directory (how).

13. You should be able to find docker-desktop-data. Inside this folder, search for kafka-connect-jdbc*

14. Once you find the directory, place the download jar file (snowflake-jdbc-3.13.27.jar) into the \kafka-connect-jdbc\ directory. Here is how it looks for me:

15. Start the Connector by clicking on Start. You should see something like the one below.

Note the log for the Kafka-connect is available at this location: http://localhost:3030/logs/connect-distributed.log

Final step: To see the changes captured!

Captured!

16. Let us first change some records in our snowflake2kafka table. Under the hood, Kafka Connect will query Snowflake after the interval and look for changes. In our case, we are only looking for ID to be incrementally changed, so if a new ID is added, which is greater than the last ID, our Kafka connector should pick it up.

INSERT INTO "snowflake2kafka" ("id", "created_at", "updated_at", "first_name",  "address") VALUES (13, '2023-01-29 10:00:00.400121', '2023-01-29 11:00:19.400121', 'Vicky',  'Chennai');

17. Once the record is added, you should see the Kafka Topic created. Head over to the homepage and click on Topic (for us it is: snowflake-kafka-etl).

18. Let us now add one more record. The Fetched count should be 2, and we should see the new ID (14)coming our way in Kafka.

INSERT INTO "snowflake2kafka" ("id", "created_at", "updated_at", "first_name",  "address") VALUES (14, '2023-01-29 10:00:00.400121', '2023-01-29 11:00:19.400121', 'Vicky',  'Chennai');

Code Repository:

All the above codes are available at the below repo.

Considerations:

  1. The polling interval will define how frequently your Snowflake is queried and in turn define the warehouse up time. This is where the dollar is so you have to be careful not to continuously poll it.
  2. To make the solution flexible you can use one connector for all your tables and topic. To do this, you can remove the query tag in the config file and specify a list of tables and then a topic prefix. For example:
    topic.prefix=snowflake.etl.
    table.whitelist=snowflake2kafka, othertable

    For any changes in the above two tables, this created two topics:
    snowflake.etl.snowflake2kafka
    snowflake.etl.
    othertable
  3. The Kafka JDBC Connect is very well-built and the properties are worth fiddling with.
  4. In our example, our mode was set to increment, but you can set it to incrementing and timestamp. That way, the watermark column (for example updated_at) will be considered and you will receive updates as well.
  5. Since this is not log-based CDC, you suffer from all the drawback (missing deletes, missing subsequent updates, etc.) For that, we can check other options such as Kafka Connect Debezium.
  6. You can write any SQL in the query tag. For example, if don’t want the connector to create its custom query of (id< and timestamp <), you can just write a complex SQL expression.

References:

--

--

Sudhendu
Snowflake Builders Blog: Data Engineers, App Developers, AI/ML, & Data Science

Mostly running, hiking. Snowflake Data Superhero. Quora Top Writer २०१४. Work @ kipi.bi. Views are my own।