Consuming and Storing Kafka Messages in Snowflake Using Kafka Connect (Part 1)

Darko Kojović
Atlantbh Engineering
8 min readMay 16, 2024

Every year, zettabytes of data are transferred over the Internet. Managing, processing, and storing the data can be a really complex task and requires cutting-edge tools. Many tools can be used, but two technologies that are almost essential to every modern data pipeline: Kafka and Snowflake.

If you are planning to stream a large amount of data, Kafka is usually the best choice. Scalability, reliability, low latency and popularity are reasons why Kafka stands out. Kafka is by far the most popular event streaming platform and a go-to for many cases when it comes to streaming data.

Since you need to process and store the data somewhere, Snowflake is one of the top contenders for that. If you are wondering why Snowflake is so popular, look at the user satisfaction — it’s over the roof. Snowflake took most of the drawbacks and oversights that other data storage solutions had and addressed it. Probably the most important feature is having the storage and compute separately scaled.

What is Kafka Connect, and why did we decide to use it?

Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other data systems. It provides streaming integration between Kafka and other data sources or sinks, enabling seamless data movement and transformation. Kafka Connect uses connectors to ingest data into Kafka topics or export data from Kafka topics to external systems.

Kafka Connect is made for use cases like this.

Installing Kafka Connect and choosing the correct mode

If you search for Kafka Connect, you will see that there is no download link. That is because Kafka Connect is not a standalone tool; it is bundled with the main Kafka installation. If you communicated with Kafka from the command line before, you probably used one of the scripts from the directory like kafka-topics.sh . Two more scripts that are used for Kafka Connect are in the same directory as the kafka-topics.sh script. Those two scripts are for two different modes, standalone and distributed:

1. connect-standalone.sh — Only runs one node of Kafka Connect and saves all the configs, offsets, and statuses locally. If the node goes down, all Kafka Connect processes are down.
This is okay for testing but should not be used in production as it is neither reliable nor fault-tolerant.

2. connect-distributed.sh — Runs a cluster of Kafka Connect nodes. Saves all configs, offsets and statuses in specific Kafka topics designated for Kafka Connect. If one node goes down, another node will take over the workload from that node.

The connect-standalone.sh script can be used for testing, but connect-distributed.sh should be used for production.

Kafka Connect components

There are a few Kafka Connect components, but the most important for us are:

  1. Connectors (sink and source)
  2. Converters (including transforms)
  3. Dead Letter Queue (DLQ)

If you are new to Kafka Connect, you may be confused about the difference between connectors and converters.

Connectors

Connectors, as the name says, connect your Kafka Connect cluster to a specific source/destination. In our case, that will be but it can also be HDFS, S3, Elasticsearch, and many more.

There are two types of connectors:

  1. Source Connectors — Take the data from a source and send it to Kafka topic(s). In the diagram below, we can see how it would look if we wanted to take files from S3 and publish them to a Kafka topic.

2. Sink Connectors — Take the data from Kafka topic(s) and send it to a destination. In the diagram below, we can see how it would look if we wanted to take messages from a Kafka topic and store them in Snowflake.

Converters

Converters are the most confusing component to set up because it’s easy to misunderstand and misconfigure them. Since Kafka messages are just bytes, converters translate data between the internal data format used by Kafka Connect and the format specific to the source or sink connector. When we store messages in Kafka topics they will always get serialized, even if we do not specify the serialization format. The Kafka producer will use the String serializer by default to serialize the message and save it to a topic as an array of bytes. Usually, you will use serialization formats like Avro or Protobuf for more complex pipelines. Since we are using both connectors and converters, let’s refer to them together as a Kafka Connect system to make it less confusing.

We have two systems: a source system and a sink system. You may see that in some places, those two components are both referred to as connectors, but that is why many people get confused. We will expand both previous diagrams to demonstrate how both source and sink systems look.

Source system

As you can see in the diagram, the source connector (in our case S3 connector) establishes the connection to S3 and starts polling the desired file. After the S3 connector gets a file, it will hand out the contents of that file to the converter. The converter will take that content and serialize it with the specified serializer to the specified format. After serialization, it will publish the serialized message to a Kafka topic.

Sink system

In sink systems, the converter first takes the serialized message from a Kafka topic, deserializes it, and hands it over to the sink connector. After that, the sink connector (in our case the Snowflake connector) saves the formatted message into the Snowflake table. If you are still confused, imagine that the connector connects Kafka Connect to a source/destination and acts as a bridge between those two points while the converter does all the data transformations (serialization/deserialization).

Understanding converters is a crucial part of setting up Kafka Connect. I highly recommend spending some time to understand connectors and converters before attempting to set up Kafka Connect for a complex pipeline. It will save you a lot of time in the long run.

Confluent published a great blog about converters here: Kafka Connect Deep Dive — Converters and Serialization Explained | Confluent.

Dead Letter Queue (DLQ)

A Dead letter queue or DLQ is a component that can be easily overlooked since it is not a required component for setting up Kafka Connect. In my opinion, DLQ is a must for any production grade data pipeline. DLQ is a component that is responsible for handling messages that failed to be consumed. In terms of Kafka, DLQ is a separate topic that is only used to catch messages that had some issues and could not be consumed.

The most common issue when this happens is when using the wrong converter. If you have worked with Avro, there is a good chance you saw the “Unknown magic byte!” error. That error occurs when you try to use an Avro converter to deserialize messages that are not Avro. A wrong converter is one example, but there can be more issues. To avoid losing messages, we need a DLQ that will handle those cases. By default, the Kafka Connect cluster will stop if it cannot deserialize a message, and we will need to manually fix the issue and restart it.

In production environments we cannot allow downtime nor lose messages and that’s where DLQ can help us. We can configure Kafka Connect to not crash when a message cannot be consumed or deserialized but instead send it to the DLQ topic.

These are the properties that we need to define if we want to do that:

"errors.tolerance": "all", 
"errors.deadletterqueue.topic.name": "MY_DLQ_TOPIC",
"errors.deadletterqueue.topic.replication.factor": 3,
"errors.deadletterqueue.context.headers.enable": true

Schema registry

Schema registry is an additional service that helps us improve the data quality of our Kafka data. It’s a centralized repository where we can store our schemas for our Kafka messages. You can think of schemas as blueprints for how the message should look like. Schema definition contains all the fields and their types that a message needs to have. When a producer wants to publish a message to a topic, it must first validate whether the message satisfies the schema definition. Only then can the message be published. The same applies for consumers. This helps us have standards, and establishes better collaboration across different teams, and helps prevent mistakes and inconsistencies.

This is an example how a schema definition looks like:

{
"doc": "Sample schema",
"fields": [
{
"doc": "ID of the user. Type Int.",
"name": "id",
"type": "int"
},
{
"doc": "Users full name. Type String.",
"name": "name",
"type": "string"
}
],
"name": "sampleRecord",
"namespace": "com.dk.mynamespace",
"type": "record"
}

Kafka Connect is made by Confluent and has many integrations with the Confluent Platform, and also different licenses. To reduce the complexity of the blog, in part 2 we will use the Confluent Platform schema registry.

In case you cannot use the Confluent Platform, Confluent provides the schema registry under Community License and in 99% of the cases you will be fine to host your own instance of the schema registry. The only exception is if you want to offer it as a service and compete with Confluent. In those cases, feel free to use alternatives like Karapace.

Conclusion

When it comes to streaming messages from Kafka to Snowflake, Kafka Connect was a clear winner for us. It provided us with high reliability, scalability and fault tolerance while also having out of the box integration with schema registry and dead letter queues.

In this blog, we learned about Kafka Connect, how to use it, and which features are necessary for a production-grade big data pipeline.

For a practical guide on how to set up Kafka Connect; stay tuned for part 2 of the blog where we configure and deploy three Kafka Connect clusters (avro, protobuf and string) on Kubernetes and demonstrate the process how we can utilize Schema Registry and DLQ (Dead Letter Queue) to establish a production grade data pipeline.

The architecture we will set up is shown here:

Originally published at https://www.atlantbh.com on May 16, 2024.

--

--