Change Data Capture with Debezuim: How to? (Episode 1)

Abhishek Giri
DataPebbles
Published in
4 min readApr 1, 2022

Change Data Capture (CDC) has emerged as an ideal solution for near real-time movement of data from relational databases to data warehouses, data lakes, or other data solutions. Business transactions captured in a relational database are critical to understanding the state of business operation.

In this post, we will see why change data capture is required for real-time Business analysis. Change Data Capture is a software process that identifies and tracks changes to data in a database. CDC provides real-time or near-real-time movement of data by moving and processing data continuously as new database events occur.

We will be using Debezium as a CDC tool to monitor the Mysql database. As data in the database changes, we can see the result in the event stream

Debezium is a distributed platform that turns your existing databases into event streams, so applications can see and respond immediately to each row-level change in the databases.

Let’s Start

We will be using Debezium with Kafka Connect in this blog to monitor Mysql data change events.

What do we need to start?
Zookeeper
Kafka
Kafka-Connect
MySQL

Download Kafka binary from this link

Step 1:
Starting the Zookeeper

Start Zookeeper: bin/zookeeper.sh config/zookeeper.properties

Step 2:
Starting the Kafka Server

Start Kafka: bin/kafka-server-start.sh config/server.properties

Step 3:
Install MySQL server and keep it up and running

A Debezium MySQL connector requires a MySQL user account. This MySQL user must have appropriate permissions on all databases for which the Debezium MySQL connector captures changes.

Create a MySQL user:

CREATE USER ‘user’@’localhost’ IDENTIFIED BY ‘password’;

Grant the permissions to the user:

GRANT ALL PRIVILEGES ON * . * TO ‘user’@’localhost’;

Finalize the user’s permissions:

FLUSH PRIVILEGES;

Enabling the Binlog

You must enable binary logging for MySQL replication. The binary logs record transaction updates for replication tools to propagate changes.

1: Check whether the log-bin option is already on

SELECT variable_value as “BINARY LOGGING STATUS (log-bin) ::” FROM performance_schema.global_variables WHERE variable_name=’log_bin’;

2: If you face an error after executing the above query then add the below property to my.cnf

performance_schema=ON

3: Confirm your changes by checking the binlog status once more

SELECT variable_value as “BINARY LOGGING STATUS (log-bin) ::” FROM performance_schema.global_variables WHERE variable_name=’log_bin’;

Step 4:
Start the Kafka-connect

First Download the connector jar of Debezium from this link and provide the path to the property file of the connector to plugin.path

bin/connect-distributed.sh config/connect-distributed.config

NOTE:

In standalone mode, Kafka-connect can also run using the below configuration

Step 1: Start the Kafka connect

bin/connect-standalone.sh config/connect-standalone.properties customer-connector.properties

Config for customer-connector.properties

Now access GET localhost:8083 to check the status of Kafka connect

Check the list of connectors registered with Kafka Connect:

curl — location — request GET ‘localhost:8083/connectors/’

You will be able to see the below output as there are no connectors registered yet

Result:  [ ]

Let's add a new connector configuration for debezium
Once you pass the MySQL connector config:

Now check localhost:8083/connector and you will be able to see running connectors

[ “customer-connector” ]

Review the connector status

curl — location — request GET ‘localhost:8083/connectors/customer-connector’

https://gist.githubusercontent.com/agiri23/15f0a6a30529110ef86691c3001e5ee5/raw/ee2ad638fdb2612ce8b503a4427408821e8140ef/connector-status.json

Viewing the created topic

bin/kafka-topics.sh — bootstrap-server=localhost:9092 –list

You should be able to see the list of topic

__consumer_offsets
connect-configs
connect-offsets
connect-status
dbserver1
dbserver1.debezium.customers
dbserver1.debezium.orders
schema-changes.debezium

Viewing a create event

bin/kafka-console-consumer.sh — bootstrap-server localhost:9092 — topic schema-changes.debezium — from-beginning

https://gist.githubusercontent.com/agiri23/7f352d5df375cce6fcf5c698caf4c4f0/raw/ddd59689da9dc305c7d342b33995df6da69bc6d6/schema-changes.json

Inserting the record in the database and viewing the inserted event

INSERT INTO `debezium`.`customers`(`id`,`first_name`,`last_name`,`email`,`mobile`)VALUES(3,’nobody’,’nobody’,’nobody@datapebbles.com’,123);

Viewing the created event

bin/kafka-console-consumer.sh — bootstrap-server localhost:9092 — topic dbserver1.debezium.customers — from-beginning

In this created event there are two sections one for schema which shows information about table definition and the second is for payload.

{
"payload": {
"before": null,
"after": {
"id": 3,
"first_name": "nobody",
"last_name": "nobody",
"email": "nobody@datapebbles.com",
"mobile": 123
},
"source": {
"version": "1.8.1.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1647876062000,
"snapshot": "false",
"db": "debezium",
"sequence": null,
"table": "customers",
"server_id": 1,
"gtid": null,
"file": "binlog.000004",
"pos": 2282,
"row": 0,
"thread": null,
"query": null
},
"op": "c",
"ts_ms": 1647876062716,
"transaction": null
}
}

the whole JSON can be seen here

Similarly, we can see the change events for all the tables in the defined database. As Debezium connector will create a topic accordingly when it detects any changes in any of the tables. It will create a topic as mentioned below

<server-name>.<database>.<table>

for eg., if there is any change in the product table then the topic would be like

dbserver1.debezium.product

Conclusion

In this article, we have learned about the CDC with MySQL and Debezium.
We also learned that the Debezium MySQL connector monitors the table structure, takes snapshots, converts binlog events to Debezium change events, and keeps track of where those events are stored in Kafka.

In the next blog, we will see more on CDC using LinkedIn Databus.
Till then keep learning.

For any queries, please reach out to me @ agiri@datapebbles.com

--

--