CDC made Easy with KTable, Debezium and Kafka Connect

high level view of this blog

“Timely Insights” have become the norm of modern businesses to effectively leverage their valuable data to derive outcomes, which ultimately helps in taking value focused business decisions.

So, to pull the data from database without hitting the DB performance, we need a tool to read and translate the Transaction Logs of database. To read the transaction logs and convert them into usable messages. There are a wide variety of tools which are available in the market like Attunity, Debezium, IBM Infosphere CDC and lot more.

In this blog we are going to see the open source CDC tool — Debezium and show how to implement SCD type 1 (Overwriting the old value) in KTable

Using Kafka Connect and open source CDC tool such as Debezium, we can stream the data to a Kafka topic, as well as any changes made to the source data in real time. In most of the streaming use cases, create KTable on top of slowly changing data in Kafka topic and join it to the KStreams to get valuable insights. If something changes in the database, it will be reflected straight away in the Kafka topic (and thus KSQL table too)

Using a CDC engine like Debezium along with KTable integration framework, we can easily build data pipelines to bridge traditional data stores and new event-driven architectures.

This approach is completely configuration-driven and runs on top of Kafka Connect, the streaming integration platform based on Kafka.

The CDC process must run as a single thread to maintain ordering. Since Debezium records the log offset asynchronously, any final sink of these changes must be idempotent.

Sample Use Case

Telecom industry does business in multiple countries and the tariff changes based on exchange rates in different countries. We have a real time Kafka streaming application which highly depends on this slowly changing data. Usually Telecom industry maintains three exchange rates in a day(start, mid and end rate). In our use case, this data will keep on changing in MySQL database. Using Debezium+Kafka Connect, these changes will be tracked in KTable and used in KStreams. This blog will cover till KTable and CDC part, we won’t focus on KStreams in this.

Steps

  1. Load data from Fixer API to MySQL
  2. Create Kafka Connector using Debezium
  3. Create KTable
  4. Simulate Changes in MySQL to see the near real time data changes in KTable using Python Dash App

Load data from Fixer API to MySQL

For demo purpose, currency exchange rates will be pulled from Fixer API(Free plan) and loaded into the MySQL database. Fixer is a simple and lightweight API for current and historical foreign exchange (Forex) rates.

Loading data into MySQL from Fixer API code base can be found here.
Note: Fixer API key is required to run this code, you can subscribe for Fixer free plan here.

Create Kafka Connector using Debezium

We have various ways to load data from MySQL to Kafka . But in recent times it has been made easy with kafka connect.

KSql introduced a new feature which allows us to do both transform and load data through a SQL API

Debezium captures the change data from MySQL by doing 2 things. Initially Debezium will load the current snapshot from the MySQL table and then it starts tracking the data changes using bin log.

Before creating connector, grant permission to read from MySQL using the below command in MySQL cli

GRANT SELECT, 
RELOAD,
SHOW DATABASES,
REPLICATION SLAVE,
REPLICATION CLIENT ON *.*
TO ‘kafka_sql’@’localhost’ IDENTIFIED BY ‘**password**’;
FLUSH PRIVILEGES;

More details on MySQL Debezium connector can be found here

Create Kafka Connector using KSQL

If the connector is successfully deployed we will see the status as running when you run the below command in Ksql cli.

Connector Properties Explanation

  • connector.class - specify the downloaded Debezium jar w.r.t source DB. Download the jar and add the class path using - find $CONFLUENT_HOME -name kafka-connect-jdbc\*jar in Ksql cli
  • tasks.max - single thread/task to maintain data order
  • table.whitelist - table that needs change tracker
  • decimal.handling.mode - by default, it will be precise. Change this to double to handle the decimal values
  • transforms.unwrap.drop.tombstones - Set this to false to handle deletes in the source

Kafka Connector will pull the data from source table to Kafka topic with the same name as table name along with database.server.name property set in the connector. Verify using list topic command as shown below:

Now the data is streamed real time into Kafka topic, use print command to view the data.

print `MYSQL_cur_server.currency_rate.exchange_rates` from beginning

Create KTable

Kafka topic will have both insert and updates for the same data. To perform the SCD Type 1 and capture the data we have to create a KTable abstraction which will maintain the latest updated records.

How KTable works?

KTable is an abstraction of a changelog stream from a primary-keyed table. Each record in this changelog stream is an update on the primary-keyed table with the record key as the primary key

.KTables are again equivalent to DB tables. Use KTable only when latest state of the row is needed, which means that any previous states can be safely thrown away.

Create KTable on top of the topic ingested from connector as shown below

KTable create statement

Once table is created, SQL statements can be used on top of KTable like Select statement to query the latest data.

Select Query on top of KTable

Monitor Real time data changes in Python Dash App(for demo purpose)

For demo purpose, manually simulating more updates in MySQL to show following 2 things

  1. how fast the changes has been tracked by connector and pushed to Kafka topic
  2. how KTable is handling the latest events

KTable reflects data changes in near real time. Once the data is updated in MySQL, within seconds changes are captured and reflected in KTable.

Python Dash App is connected to KTable. Below is the dashboard backed up by Dash App.

Dash App connected to KTable

Data Changes has been reflected within a matter of seconds with minimal effort from developer.

Below is the sample KTable rows(before and after updates):

KTable

Wrapping Up

A KTable provides mutable data. New rows can be inserted, and existing rows can be updated or deleted. Here, an event’s key aka row key identifies which row is being mutated. Using KTable, SCD type 1 is easily achievable with minimum effort.

KTables are persistent, durable, and fault tolerant. Today, a KTable behaves much like the RDBMS materialized view.

Happy Change Capturing !!

--

--