Streaming Data From MySQL with Kafka Connect JDBC Source Connector

Batu Kargili
ÇSTech
Published in
9 min readDec 24, 2021

Kafka Connect is an essential component of the Çiçeksepeti Data Engineering Team’s streaming pipelines. It allows us to import data from any data source to our Kafka topics. Using both source and sink connectors we can put Kafka at the center of our pipelines.

Most of our streaming pipelines start with a CDC connector that captures changes in our operational tables as they occurred. By capturing the changes in our tables, we can process our ever-increasing data volume near real-time, feed our data warehouses continuously, and meet our modernizing business requirements. As log-based CDC approaches like Debezium is our priority while implementing any change-data-capture system because of its better data fidelity and its useful before-after schema, it is not always possible to integrate log-based CDC. Log systems are very low-level components of the databases and we use many different types of databases with different shapes, so even enabling the CDC in these databases may create some problems. In such cases, JDBC Source Connector, which is a relatively primitive and easy to integrate method that can connect to almost any relational database, comes to our aid.

In this blogpost, we’ll talk about the features and tricky points of JDBC Source Connectors while implementing it. We are going to stream data from MySQL tables and views by using different Incremental query modes with setting the initial values where our streaming starts.

TL;DR

You can stream data from any JDBC-compatible database into Kafka with JDBC Source Connector. All the configs for JDBC Source Connectors and the docker-compose files for deploying MySQL and Kafka Connect to your local environment are shared in this GitHub repository.

JDBC Source Connector

JDBC Source Connector is an open-source Kafka Connector developed, tested, and supported by Confluent for loading data from JDBC-compatible databases to Kafka. It uses JDBC drivers and periodically sends SQL queries to fetch data from specified tables. It guarantees at least once delivery and the latest offsets where the connector starts in each cycle is stored in Kafka Connect OFFSET topic.

Example sql query generated by JDBC Source Connector in incrementing mode:

Streaming from Products table with incrementing mode

Although because of query-based nature it can only see the latest values of the table with this query so we can only catch INSERT and UPDATE events. DELETE events can only be streamed by a log-based CDC platform. If you need DELETE events you should check Kafka Connect Debezium implementations instead.

With query-based approaches like JDBC, you can also miss some updates in a cycle if they occurred more than once in the same raw. You can see how JDBC skips an update event between t1-t4 from the timeline table below.

This table is just an example showing what results we can get with different methods. In this simplified example, it is assumed that the polling times of the two connectors are the same.

query vs log based capturing data

For more comparison between query-based and log-based approaches check here

Deployment

We are going to implement a streaming pipeline that listens to changes in source tables and views, then streams them to related Kafka topics similar to the architecture below. (The sink and processing parts after Kafka topics were not implemented in this post).

JDBC Source Connector Streaming Pipeline Example

MySQL Deployment: If you don’t have a test environment, you can deploy locally with this docker-compose file.

Kafka Deployment: If your Mysql is ready to go, it’s time to build a Kafka Cluster. As I go with Confluent Cloud and keep my mind clear with the infra workloads of Kafka in the next parts of this blog, you can directly create your own Kafka Cluster on-premises with this docker-compose file.

MySQL on docker container, Kafka topics on Confluent Cloud

Kafka Connect Deployment: Now we have our MySQL tables and Kafka Topics separately. To connect this two and catch the transactions on our tables we’re going to deploy a Kafka Connect Cluster. These nodes called workers in Kafka Connect which we run our Connectors on them. We’re using the cp-kafka-connect image with given configs, then install the JDBC connector from confluent hub.

The JDBC connector comes with JDBC drivers for a few database systems like SQL Server and PostgreSQL but we have to download MySQL Driver manually.

After setting up our environment, we will launch the Kafka Connect worker.

You can use this docker-compose file to deploy your Kafka Connect worker with JDBC Source Connector.

Important Worker Configurations: If your worker doesn’t work the way you want, you probably gave config values badly. As you see in the docker-compose file above there are lots of configs to understand. I try to explain the most crucial ones but If you want to dive deeper into worker configurations you should check this documentation.

CONNECT_BOOTSTRAP_SERVERS: Url of your Kafka Brokers
CONNECT_REST_PORT: Kafka Connects Rest api's port (8083)
CONNECT_REST_ADVERTISED_HOST_NAME: the hostname which other workers see this one.
CONNECT_GROUP_ID: A unique string that identifies the Connect cluster group this Worker belongs to. The workers with same group_id works together in distributed clusters
CONNECT_CONFIG_STORAGE_TOPIC: internal topic to hold connectors configs (1 partition, cleanup policy: Compact)
CONNECT_OFFSET_STORAGE_TOPIC: internal topic to hold connectors latest offsets for tables((5x or more) partition, cleanup policy: Compact)
CONNECT_STATUS_STORAGE_TOPIC: internal topic to hold connectors status((3x or more) partition, cleanup policy: Compact)

Plugin and Driver Installation Docker Compose Command:

  1. Installing JDBC Source Connector plugin: You can manually install the desired plugins to the plugin path or you can automate this in your docker-compose file.
  2. Installing JDBC driver: Mysql JDBC Driver doesn’t include in the JDBC Source Connector so we have to install it with another bash command. Read this awesome article for more about JDBC drivers. And watch this tutorial if you keep getting errors.
  3. Launch Worker and Wait: After we have our plugin and driver, we can launch our worker.
Our Kafka Connect worker, connector and task figure

Finally, our Kafka Connector is up. Our deployment process is over and we are ready to play with JDBC Source Connector. But before that let’s check Schema Registry and Kafka Connects Rest API.

Kafka Connect Rest API: Kafka Connect comes with an API for managing connectors. By using Rest API we can list our connectors, check their status, add new connectors, update existing ones and delete them. In the next part of this post, we use Rest API to add new connectors to our workers.

check if we successfully install our JDBC plugin :

curl --location --request GET 'http://localhost:8083/connector-plugins'

You’ll see the JDBC plugins and some pre-installed plugins in the response.

More about Kafka Connect REST check here

Schema Registry: We want to send our data in Avro format and we need a serving layer for our metadata. We choose Confluent Cloud like we did in our Kafka cluster deployment. Only defining some properties in my docker-compose file will be enough to connect Schema Registry.

CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: ${SCHEMA_REGISTRY_URL}
CONNECT_VALUE_CONVERTER_BASIC_AUTH_CREDENTIALS_SOURCE: "USER_INFO"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO: ${SCHEMA_REGISTRY_BASIC_AUTH_USER_INFO}

You can take schema_registry_url and basic_auth_user_info from Confluent Cloud UI or if you don’t use confluent cloud you can take them from your on-premises schema registry’s config file.

More about Schema Registry check documentation and this medium blog.

Connector Implementation

We deployed our clusters successfully and were finally ready to do the actual work. We are going to create two different JDBC connectors, the first one reads from a Table with timestamp + incrementing mode and the second one reads from a View with incrementing mode.

JDBC Source Connector configs:

We should implement our config parameters correctly to meet our business needs without causing any trouble in our source databases.

Streaming from a Table with Timestamp+incrementing mode: The most accurate mode which combines incrementing column with a timestamp column. With this mode , we can catch the UPDATE and INSERT events.

The connector config for the Products table:

configs with descriptions

For all JDBC Connector Source Connector Configuration Properties check here.

Before pushing this connector to Kafka Connect, We should create our target topic . The topic name is <topic.prefix> + <table name>. In this example it’s JDBC.test_db.Product.

Push Connector : You can use any method to send push request. I’ll continue with curl 👇.

Connector Status:

Connector Status is running

Our status is Running and our task starts working. We can also send this request periodically to check your connectors status.

If we check our Kafka Connect logs, we can see its flushing new messages and commits offsets. Now you can insert new records and try some updates to check if they appear in Kafka topic near real-time.

Kafka Connect Logs after new Connector Added

Streaming from a View with Incrementing mode

In some business cases, we would like to stream only INSERT events like audit logs or transactions. And there’s also a possibility that our tables or views have no timestamp column at all. In such cases, we can use incrementing mode which uses a unique id column to take new rows only. If we really need to catch updates from our Views we should add a timestamp column to it or use a log-based CDC connector like Debezium.

The connector config for view: We will make minor changes to the above timestamp+incrementing example. Changing table name to our new View and mode to “incrementing”, removing timestamp properties, and changing the table type to “VIEW” would be enough.

“mode” : “incrementing”,

“table.types” : “VIEW”,

“table.whitelist” : “test_db.ProductsAboveAveragePrice”

JDBC Source Connector doesn’t have incrementing.initial property so if we start our connector with this config, it will take a bulk load from the first row. When we have a large table that we don’t want to read all of it, we must give an initial incrementing value to our connector.

Setting incrementing column initial (resetting the point): We can manipulate Kafka Connect’s offset topic by giving the id value we want. When the connector starts, it takes the latest offset value recorded in the offsets topic and starts the query with the given id. Let’s produce these records and manipulate connectors using ccloud cli or kafkacat.

We produce to our Kafka Connect offsets topic by giving connector name, table name, and incrementing value.

with ccloud cli:

ccloud kafka topic produce _kafka-connect-jdbc-mysql-offsets  --parse-key --delimiter '#'> ["mysql-jdbc-product-view-00",{"protocol":"1","table":"test_db.ProductsAboveAveragePrice"}]#{"incrementing":5}

with kafkacat:

echo '["mysql-jdbc-product-view-00",{"protocol":"1","table":"test_db.ProductsAboveAveragePrice"}]#{"incrementing":10}' | \
kafkacat -b kafka:29092 -t _kafka-connect-jdbc-mysql-offsets -P -Z -K#

After setting our incrementing value (check by consuming the topic) we can push our connector with the config we create. 👇

from view with incrementing mode config

Now we can see new events coming to our target topic 🦾.

Conclusion

We have created a data pipeline piece by piece from MySQL deployment to JDBC Source Connector configs. I hope you find this blog post instructive and it will accelerate your development processes.

Thank you for your time, you can always reach me on twitter 👋

References

Lastly, I would like to share my favorite contents about Kafka Connect and JDBC Source Connector.

https://debezium.io/documentation/reference/connectors/mysql.html

https://rmoff.net/2018/03/24/streaming-data-from-mysql-into-kafka-with-kafka-connect-and-debezium/

--

--