Streaming Data to RDBMS by Kafka (Part 2)

Bünyamin Şentürk
VakıfBank Teknoloji
6 min readNov 23, 2022

In this article, which is the continuation of the first part written by my colleague Ahmet GÜL, I will be talking about how we transfer data from Kafka to a relational database. The first part can be found here:

As mentioned in Part 1, the data was successfully produced to Kafka. While receiving the data, Kafka was in a passive position. Next step is to sink (transfer/write) these records to a MSSQL table.

Simple representation of the mentioned system

To do that we need some additional Kafka components called Kafka Connect and Schema Registry. What are these components, what is the purpose of using them? In the sections below I will explain these questions and how we use it in our system. Let’s start with the Schema Registry.

Schema Registry

Included in the installation package of Confluent Kafka, Schema Registry may come handy in many situations. Actually it is a need in our case since the produced data is in Avro format (avro data type needs a schema). This part of the Confluent doc of Schema Registry gives a brief idea of the component:

Confluent Schema Registry provides a serving layer for your metadata. It provides a RESTful interface for storing and retrieving your Avro®, JSON Schema, and Protobuf schemas. It stores a versioned history of all schemas based on a specified subject name strategy, provides multiple compatibility settings and allows evolution of schemas according to the configured compatibility settings and expanded support for these schema types. It provides serializers that plug into Apache Kafka® clients that handle schema storage and retrieval for Kafka messages that are sent in any of the supported formats.

Schema Registry lives outside of and separately from your Kafka brokers. Your producers and consumers still talk to Kafka to publish and read data (messages) to topics. Concurrently, they can also talk to Schema Registry to send and retrieve schemas that describe the data models for the messages.

If the records in Kafka are going to be inserted in a relational database, value and key schemas of this table are needed. This is because a relational database consists of columns and all the records must satisfy the requirities of these columns. So, first step is creating a schema according to table we are going to use, with Schema Registry.

Create script of the mentioned table will help us to create its value schema:

We need to decide which type will be used in the schema for each corresponding type in SQL. Long for bigint, int and datetime, string for varchar should do it. Mind that we are using long for datetime as a timestamp conversion will be done in connector configuration. According to these columns, its value schema will be created with this command:

We also need its key schema. In our setup only field in key schema is the primary key of table, which is Id.

To check if both schemas are deployed properly, below commands can be used:

Now we are ready to continue with the connector part.

Kafka Connect

A simple explanation can be found in Confluent docs:

Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka® and other data systems. It makes it simple to quickly define connectors that move large data sets in and out of Kafka. Kafka Connect can ingest entire databases or collect metrics from all your application servers into Kafka topics, making the data available for stream processing with low latency. An export connector can deliver data from Kafka topics into secondary indexes like Elasticsearch, or into batch systems–such as Hadoop for offline analysis.

Kafka Connect allows us to use Connectors. A connector, as the name suggests, acts as a middle-man between both producer (where data comes from) and consumer (where data goes to) side. Below figure explains this structure briefly:

image source: https://developer.confluent.io/learn-kafka/kafka-connect/intro/

Connector Setup

As I explained before, we didn’t use a connector to produce data. It’s written directly to Kafka and now the same data will be written to destination database by using a connector. First of all, a suitable driver is needed. Kafka Connect can establish connections with many types of databases (oracle, couchbase, rabbitmq etc.) but plugins (.jar files) of these databases don’t come with the default installation. In our case we will be using JDBC connector.

JDBC connectors allow us to exchange data between Kafka and relational databases. For more information and download links, visit

After downloading the zip file, we need to extract it to plugin path. By default, the plugin path is given as ~/share/java/. We could put the files in this location or give another plugin path in connect-avro-distributed.properties file and use that path. To make it more organized, it is better to use another path and create folders for each plugin under that new location. We created a new path and put all the files there. It should be remembered that a restart is needed after defining a new plugin path or installing a new plugin.

At this point JDBC plugin should be installed correctly. To check that we will use a simple REST command:

If JDBC source and sink connector informations pop up, everything is installed correctly and it is ready to use. We can continue with the configuration of the connector.

Connector Configuration

To create a connector, its properties must be defined. While some of these properties are common for each connector, some of them are type-specific. Since we use JDBC connector it is better to check JDBC connector docs:

To keep things simple I will show you a basic sink connector configuration. This connector will consume data from the topic we defined and insert them to MSSQL table. Explanation of each parameter is given below.

“name” is the name of our connector.

“tasks.max” is the number of tasks running for this connector. Best choice is giving this number as same as the partition count of the topic.

“connector.class” defines which type of connector will be used, in our case it is JDBC Sink Connector.

“connection.url” is the connection string of the database we want to connect.

“connection.user” is the username needed to connect MSSQL. As you realized we didn’t give it directly in configuration because of security concerns. It is possible to make it read from a file.

“connection.password” is the password of given user. Just like the username we also made connector to get this info from a file.

“table.name.format” is the name of the table we want to insert on.

“topics” is the name of topic we want to read data from.

“transforms” all these parameters are used for date transformation. In our case, OpTime column will be used for timestamp conversion.

“auto.create” is given as false. By doing it we make sure a new table will not be created if the given table name is incorrect.

“insert.mode” could be insert, upsert or update. We stay with “insert”.

The connector is ready to be created. Again, we use REST interface to create it. Below command will create the connector:

To check if it is created successfully and in RUNNING state, another REST command can be used:

Double Check

Connector plugin is installed, schemas are created and the connector is running. To make sure the connector is consuming data from the topic, we can use one of Kafka’s scripts that come with the installation.

These scripts are located under the path ~/confluent-7.0.1/bin/. The script we are going to use is named as kafka-consumer-groups. It can be used to get information about consumer groups, skip some of the records by giving a determined offset etc. Before using the command we need to know what we named our connector. Below command will give us the current number of records and how many of them are consumed successfully. We put “connect-” before the connector name since consumer group name of connectors are given as “connect-{connector_name}” by default.

If Lag is 0 or Current Offset is increasing as you run the command again, that means everything is working as intended. Mind that lag can be way bigger than 0 if there is a high incoming traffic.

Thanks for your time, see you in another article.

--

--