Change Data Capture with Debezuim: How to? (Episode 1)
Change Data Capture (CDC) has emerged as an ideal solution for near real-time movement of data from relational databases to data warehouses, data lakes, or other data solutions. Business transactions captured in a relational database are critical to understanding the state of business operation.
In this post, we will see why change data capture is required for real-time Business analysis. Change Data Capture is a software process that identifies and tracks changes to data in a database. CDC provides real-time or near-real-time movement of data by moving and processing data continuously as new database events occur.
We will be using Debezium as a CDC tool to monitor the Mysql database. As data in the database changes, we can see the result in the event stream
Debezium is a distributed platform that turns your existing databases into event streams, so applications can see and respond immediately to each row-level change in the databases.
Let’s Start
We will be using Debezium with Kafka Connect in this blog to monitor Mysql data change events.
What do we need to start?
Zookeeper
Kafka
Kafka-Connect
MySQL
Download Kafka binary from this link
Step 1:
Starting the Zookeeper
Start Zookeeper: bin/zookeeper.sh config/zookeeper.properties
Step 2:
Starting the Kafka Server
Start Kafka: bin/kafka-server-start.sh config/server.properties
Step 3:
Install MySQL server and keep it up and running
A Debezium MySQL connector requires a MySQL user account. This MySQL user must have appropriate permissions on all databases for which the Debezium MySQL connector captures changes.
Create a MySQL user:
CREATE USER ‘user’@’localhost’ IDENTIFIED BY ‘password’;
Grant the permissions to the user:
GRANT ALL PRIVILEGES ON * . * TO ‘user’@’localhost’;
Finalize the user’s permissions:
FLUSH PRIVILEGES;
Enabling the Binlog
You must enable binary logging for MySQL replication. The binary logs record transaction updates for replication tools to propagate changes.
1: Check whether the log-bin option is already on
SELECT variable_value as “BINARY LOGGING STATUS (log-bin) ::” FROM performance_schema.global_variables WHERE variable_name=’log_bin’;
2: If you face an error after executing the above query then add the below property to my.cnf
performance_schema=ON
3: Confirm your changes by checking the binlog status once more
SELECT variable_value as “BINARY LOGGING STATUS (log-bin) ::” FROM performance_schema.global_variables WHERE variable_name=’log_bin’;
Step 4:
Start the Kafka-connect
First Download the connector jar of Debezium from this link and provide the path to the property file of the connector to plugin.path
bin/connect-distributed.sh config/connect-distributed.config
NOTE:
In standalone mode, Kafka-connect can also run using the below configuration
Step 1: Start the Kafka connect
bin/connect-standalone.sh config/connect-standalone.properties customer-connector.properties
Config for customer-connector.properties
Now access GET localhost:8083 to check the status of Kafka connect
Check the list of connectors registered with Kafka Connect:
curl — location — request GET ‘localhost:8083/connectors/’
You will be able to see the below output as there are no connectors registered yet
Result: [ ]
Let's add a new connector configuration for debezium
Once you pass the MySQL connector config:
Now check localhost:8083/connector and you will be able to see running connectors
[ “customer-connector” ]
Review the connector status
curl — location — request GET ‘localhost:8083/connectors/customer-connector’
Viewing the created topic
bin/kafka-topics.sh — bootstrap-server=localhost:9092 –list
You should be able to see the list of topic
__consumer_offsets
connect-configs
connect-offsets
connect-status
dbserver1
dbserver1.debezium.customers
dbserver1.debezium.orders
schema-changes.debezium
Viewing a create event
bin/kafka-console-consumer.sh — bootstrap-server localhost:9092 — topic schema-changes.debezium — from-beginning
Inserting the record in the database and viewing the inserted event
INSERT INTO `debezium`.`customers`(`id`,`first_name`,`last_name`,`email`,`mobile`)VALUES(3,’nobody’,’nobody’,’nobody@datapebbles.com’,123);
Viewing the created event
bin/kafka-console-consumer.sh — bootstrap-server localhost:9092 — topic dbserver1.debezium.customers — from-beginning
In this created event there are two sections one for schema which shows information about table definition and the second is for payload.
{
"payload": {
"before": null,
"after": {
"id": 3,
"first_name": "nobody",
"last_name": "nobody",
"email": "nobody@datapebbles.com",
"mobile": 123
},
"source": {
"version": "1.8.1.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1647876062000,
"snapshot": "false",
"db": "debezium",
"sequence": null,
"table": "customers",
"server_id": 1,
"gtid": null,
"file": "binlog.000004",
"pos": 2282,
"row": 0,
"thread": null,
"query": null
},
"op": "c",
"ts_ms": 1647876062716,
"transaction": null
}
}
the whole JSON can be seen here
Similarly, we can see the change events for all the tables in the defined database. As Debezium connector will create a topic accordingly when it detects any changes in any of the tables. It will create a topic as mentioned below
<server-name>.<database>.<table>
for eg., if there is any change in the product table then the topic would be like
dbserver1.debezium.product
Conclusion
In this article, we have learned about the CDC with MySQL and Debezium.
We also learned that the Debezium MySQL connector monitors the table structure, takes snapshots, converts binlog events to Debezium change events, and keeps track of where those events are stored in Kafka.
In the next blog, we will see more on CDC using LinkedIn Databus.
Till then keep learning.
For any queries, please reach out to me @ agiri@datapebbles.com