Change data capture with Debezium and Kafka

Phuong Hung
7 min readJul 8, 2023

Change Data Capture (CDC) refers to sourcing database change events from a source database. While there are commercial solutions available in the market, Debezium is available as an open-source option. In this blog post, I am going to show you how to install the Debezium MySQL Connector on Ubuntu machines using Google VM instances. So let’s get started!

Step 1 — Getting Ready

In order to execute the steps outlined in this blog post, you will need a Google account with billing and some familiarity with how to create and connect to your VM machine. You might see there are a lot of topics here to set up instances and connect to VM.

We will be creating 2 Linux machines on Google VM instances. One will act as MySQL Server and another will install and host Debezium. Please follow the steps below:

1. Login to VM Instance

2. From Create an Instance Menu, select New VM Instances

3. From the Machine configuration section, select N1 Series (n1-standard-2; 2 vCPU, 7.5 GB memory). Then you might select the “Boost disk” section, in this blog post, I choose Ubuntu as an operating system with version: Ubuntu 20.04 LTS and Size 50GB SSD. Then from the Advances options, you might create 01 VPC Network with 01 Subnetwork — You might see there

4. Then let everything default and Create

5. Wait for instance to launch completely until State is Running

6. Then we will access the VM instance. We might have different ways to access the Ubuntu machine created above. For the purpose of a simple demo, I opt to connect via gcloud command and ssh to VM machine:

gcloud compute ssh - zone <your_zone_here> <your_vm_instance_name_here> -- project <project_id>

Step 2 — Installing and Configuring MySQL Server

Connect to your MySQL-Server using ssh and follow the steps below to create a Source Database in MySQL for our purpose:

  1. Update Repo:
sudo apt-get update

2. Install MySQL Server:

sudo apt-get install mysql-server

3. Login to MySQL:

sudo mysql -u root -p

4. Create user ‘debezium’ with password ‘dbz’ for our demo purpose:

GRANT ALL PRIVILEGES ON *.* TO ‘debezium’@’%’ IDENTIFIED BY ‘dbz’;
FLUSH PRIVILEGES;

5. Check Binary Logging Status with the following commands:

SHOW variables LIKE'log_bin';

#If the query result is OFF, then you can enable it by:

SET @@binlog_rows_query_log_events=ON;

For more information, you can Logout of the MySQL shell and edit the MySQL configuration file:

sudo nano /etc/mysql/mysql.conf.d/mysqld.cnf
#This is a default path for installing MySQL in Ubuntu, in other operating systems the default path might be different

And change these properties:

[mysqld]

log_bin = ON
log-bin = mysql-bin
binlog_row_image = FULL
binlog-format = ROW
server-id = 223344

binlog_rows_query_log_events = ON
expire_logs_days = 90
gtid_mode = ON
enforce_gtid_consistency = ON
performance_schema=ON

Then search for the row that contains bind-address and comment it out as follows and save the file. This will enable our MySQL service to be accessed from anywhere.

bind-address = 0.0.0.0
port = 3306
mysqlx-bind-address = 127.0.0.1

6. Now, we need to restart MySQL service for our changes to take effect:

sudo service mysql restart

7. Log in to MySQL shell again and use the following commands to verify that BINARY LOGGING has been set to ON:

Our MySQL server is now set up for Change Data Capture. Initially, when we connect it to it through Debezium Connector, the entire database will be considered as a Change Event and loaded in Kafka. Therefore, I suggest you take a small set of test data.

Step 3 — Installing Zookeeper

Connect to your Debezium-Server and follow the steps below:

  1. Install Java-8:
sudo apt-get install openjdk-8-jdk

2. Download Zookeeper from the Apache Zookeeper site

wget https://archive.apache.org/dist/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2.tar.gz

3. Extract the Zookeeper file

tar -xvzf apache-zookeeper-3.6.2-bin.tar.gz

4. Rename Zookeeper directory for ease of use:

mv apache-zookeeper-3.6.2-bin.tar.gz zookeeper

Step 4 — Installing Kafka

Download & Extract Kafka from the Apache Kafka site. I will use version 2.6.3 with Scala 2.12

wget https://archive.apache.org/dist/kafka/2.6.3/kafka_2.12-2.6.3.tgz
tar -xvzf kafka_2.12-2.6.3.tgz

mv kafka_2.12-2.6.3 kafka

Step 5 — Setting Up the Debezium MySQL Connector

  1. Download Debezium Connector Jar files from Maven Central. I will use stable version 1.8 for this demo
cd kafka
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/1.8.0.Final/debezium-connector-mysql-1.8.0.Final-plugin.tar.gz
tar -xvzf debezium-connector-mysql-1.8.0.Final-plugin.tar.gz

#Debezium Connector acts as a Kafka Connector and Kafka stores its connector jars in a specific directory — /kafka/plugins.
mv debezium-connector-mysql-1.8.0.Final-plugin plugins

2. Next, we make the required configurations on Kafka side

We let Kafka know that its Connect Service should load our Debezium-MySQL-Connector from /kafka/plugins. For this let’s edit the connect-standalone.properties file in kafka/config directory. Remove the # from the last line and add the plugin path as shown:

nano kafka/config/connect-standalone.properties
connect-standalone.properties

3. Copy all jars files in /kafka/plugins to /kafka/libs in order to make things work correctly

cp -p /plugins/*.jar ./libs/

4. Configure Debezium MySQL connector

Create a new properties file named connect-debezium-mysql.properties and paste the contents below into that file. Change highlighted properties to suit your sample data and Save the file in kafka/config directory.

name=mysql-connector-02
connector.class=io.debezium.connector.mysql.MySqlConnector
tasks.max=1
database.hostname=localhost
database.port=3306
database.user=debezium
database.password=dbz
database.server.id=223344
database.history.kafka.topic=msql.history
database.server.name=mysql-connector-02

database.dbname=classicmodels

database.history.kafka.bootstrap.servers=localhost:9092

Note that: in order to make this demo work correctly, you need to recreate a Kafka topic before. Let’s say, for example, I create a “msql.history” topic.

bin/kafka-topics.sh - create - bootstrap-server localhost:9092 - replication-factor 1 - partitions 1 - topic msql.history

Step 6 — Operation

We are now ready to test Debezium MySQL Connector. For the best learning experience, I suggest that you open multiple Terminal (Say 5) and connect to your Debezium Server instance from each of these:

Terminal 1 — Start Zookeeper Service

bin/zookeeper-server-start.sh kafka/config/zookeeper.properties
Zookeeper service

Terminial 2 — Start Kafka Service

bin/kafka-server-start.sh kafka/config/server.properties
Kafka service

Terminal 3 — Monitor Topics and Messages

Run the following commands to see if any Kafka Topics are created:

bin/kafka-topics.sh --list --bootstrap-server localhost:9092 

#Use following commands to Watch messages published in a topic:

bin/kafka-console-consumer.sh — bootstrap-server localhost:9092 --topic <topic-name> --from-beginning

You can Press CTRL-C anytime to exit the console consumer. Omit –from-beginning option if you want to see only the latest messages. Initially, you will not see any topics. But as we start Debezium-MySQL-Connector in the next step, you will see one topic per table and messages flowing into them.

Terminal 4 — Start Kafka Connect Service with Debezium-MySQL-Connector

kafka/bin/connect-standalone.sh kafka/config/connect-standalone.properties kafka/config/connect-debezium-mysql.properties

#You might check whether or not the connection is runing

curl -s localhost:8083/connectors/mysql-connector/status | jq

You might see something like this:

Debezium service

You can now go back to Terminal 3 and check that new topics are created. You can watch the messages in the topic to see that your data has been loaded in Kafka topics. Your specific changes are loaded under the ‘Payload’ tag.

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic <your_topic_name> --from-beginning

The following is a sample message generated from a single delete event. The selected part indicates that the captioned row has been deleted.

Kafka Message.

Terminal 5 — Login to MySQL To Make Database Changes

In the example above (Kafka Message), you will notice that a lot of metadata is sent with each message. This is the default behavior of Kafka Connect Service — if we wish to filter the message of the metadata (e.g, we wish to capture data from ‘payload’, make the following two changes in the connect-debezium-mysql.properties and connect-standalone.properties file:


key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
schemas.enable=false

Go to Terminal 4 and press CTRL-C to terminate the Kafka Connect Service and then restart it with the same command.

Kafka Message after changed properties.

You can now see an Insert event, with the before tag as null and after the tag having new values. Now modify some values in the same record in MySQL and you will see that both before and after tags have the respective values.

Final Thought

This was a quick-start guide to give you an idea of how Debezium works. A production setup of Debezium and Kafka, however, would be significantly different from this lab setup in the following ways:

  1. The use of AVRO instead of JSON significantly reduces the amount of metadata wrapped around each message.
  2. Implementation of Consumers for the terminal sink. Since Kafka is only temporary storage for messages, we need to consume these messages and make these changes to materialized tables in a Terminal sink, such as Snowflake or BigQuery.

Next Step:

Because this is just a part of the entire data pipeline for streaming data. In the next following this post. I would try to conduct a small project that is reading data from Kafka — using Spark Structure Streaming and writing streaming data to storage layers (HDFS, Google Bucket).

Thank you for reading!!

--

--