Simple CDC with Debezium + Kafka

Matheus Tramontini
Nagoya Foundation
Published in
10 min readMar 27, 2022

The data areas in companies are getting bigger and more complex, the need for real-time data and information ends up becoming something crucial for business development. With this increasingly expressive need, it has become a common challenge for data engineers to seek solutions that capture gigantic masses of data with the lowest possible latency, because of that in this post I will talk about the components of this structure and in the end leave a github repository that you can use to build this pipeline and test by yourself.

Simplified architecture of the post final product

So whats CDC?

CDC (Change Data Capture) is a process that identifies changes in data in databases working with two types approaches logs and triggers, providing real-time or near real-time information.

You can check some use-cases in this Medium post.

More and more logs with Kafka!

Created by LinkedIn to handle the MASSIVE volume of data they generated, Kafka is one of the biggest tools for streaming messages, being used by the biggest companies in the world. It has an architecture fully based in LOGS which allows it to execute assynchronous transmissions in high performance, looking like distributed systems.

But before seeing its architecture we need to name a few things!

  • Broker: Responsible for receiving and sending the messages, it consists in two parts: topics and partitions.
  • Topics: It’s an identifier used to organize the message into categories.
  • Partitions: A subdivision of a topic to organize and balance the data.
  • Kafka cluster: A cluster set, being the main instance of Kafka.
  • Producers: The source of the data, send and distribute the logs.
  • Consumers: Consumers subscribe to a topic and listen to them all the time, receiving logs.

Now that we are familiar with some concepts, we can simplify its architecture into a simple image.

Kafka’s simplified architecture

Some of the benefits of using Kafka:

  • Really really high scalability.
  • Low latency.
  • Fault-tolerant.
  • A lot of different connectors (thanks community!).

But as any other software it has its cons:

  • Does not have a complete set of monitoring tools.
  • Message tweaking issues.
  • Lacks some message paradigms.
  • Complexity in its initial configuration.

Zookeeper

Zookeeper is a centralizer used to keep services’ configurations and names providing flexibility and integrity into synchronizations.

Schema Registry

In a simple way it is a service layer for the messages’ metadata that uses a RESTful interface to store and load schemas, it uses Kafka itself as the storage layer.
Note: Schemas registry is mainly needed to Avro and parquet formats, if you intend to use JSON you might not need this.

Schema Registry architecture

Kafka Connect:

The main component in integrations responsible to connect sources and sinks through jars, databases configurations, user infos, etc..
Connect has an architecture divided into three main components: Connectors, Transforms and Converters.

Example of capture and send to Kafka
Example of Kafka sending messages for a sink

Connectors
Connectors are the link between Kafka and external components, can be build by companies or the community and mainly saved into JARs.
There’s a LOT of connectors made by community but you can build your own using this link as a reference.
A point of attention is how you send the configurations to the connectors, because we have two ways to do so, using Kafka’s REST API or ksqlDB.
In this example we will use REST API but I will leave an example of ksqlDB for anyone interested.

Example of ksqlDBCREATE SINK CONNECTOR sink-elastic-01 WITH ('connector.class'= 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector', 'topics'='orders','connection.url'='http://elasticsearch:9200','type.name'='_doc','key.ignore'='false','schema.ignore'='true');

Transforms
Transforms are totally optional, their function is to get the data before Kafka or the sink connector and apply any desired transformations, for example, convert a datetype, add a custom field and others.

Converters
Responsible for data serialization and deserialization between Kafka and Connector, the most common converters are JSON, Avro and protobuf.

Debezium:

Debezium is an open-source platform for CDC that connects producers. The main goal to use Debezium here is the configuration of the state of the data (we will see it further), but Debezium has other advantages like filtering, usage of snapshots, masking for sensitive data and others that you can check here.

Finally, hands-on!

First of all, you need to understand Docker and its components to execute this pipeline, I will create the docker-compose file but I won’t explain all the docker details step by step.

Building our docker-compose file

The first part of all docker-compose is our database, maybe the most important part of all the pipe, because it will contain the data we want to capture.
This part is pretty simple because Debezium has an image of a configured mysql to use in studies cases like ours. We only need to choose which port will be exposed and the database’s credentials.

mysql:
image: debezium/example-mysql:1.8
container_name: mysql
hostname: mysql
ports:
- '3306:3306'
environment:
MYSQL_ROOT_PASSWORD: debezium
MYSQL_USER: mysqluser
MYSQL_PASSWORD: mysqlpw

You can test your database accessing the container and execute mysql with

mysql -u mysqluser -p

This image already have a database and some tables about inventory, you can check them with the commands:

use inventory;
show tables;
select * from customers limit 10;

Zookeeper:

Now that we have our database up and running we need to build our Kafka structure, in this project it is the zookeeper, in the future Kafka won’t use zookeeper.

zookeeper:
image: confluentinc/cp-zookeeper:5.5.3
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- '2181:2181'

The environment is very simple, we only set the client port and the tick time used to do heartbeats and the minimum session timeout will be double this time.

Schema-registry:

Before configure the main component lets set our schema-registry configuration, its very simple because we only need to configure the zookeeper connection url, the host and the listener that schema-registry will use.

schema-registry:    
container_name: schema-registry
image: confluentinc/cp-schema-registry:4.0.3
environment:
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
ports:
- '8081:8081'

Kafka:

Finally lets build our main component of this architecture, in this case the environment variables are pretty simple (thanks docker images!) but if you need some specific configuration you can find it here.

kafka:
image: confluentinc/cp-enterprise-kafka:5.5.3
container_name: kafka
depends_on:
- zookeeper
environment:
- KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
- KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
- KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
links:
- zookeeper
ports:
- '9092:9092'

Kafka variables glossary:

  • KAFKA_ZOOKEEPER_CONNECT: The address that Kafka will communicate with Zookeper.
  • KAFKA_ADVERTISED_LISTENERS: How the host name can be reached by clients.
  • KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: The default value of this parameter is 3 but in this case we are using a single-node application, because of that we need to set this 1, but if you are using a multi cluster application you can use the default value.

Kafka Connect:

With the main architecture up and running we need to set the connector that will get the information from the database, for this case we are using debezium/connect:1.8 but you can use confluentinc/cp-kafka-connect to use personalized connectors.

kafka_connect:
container_name: kafka_connect
image: debezium/connect:1.8
ports:
- '8083:8083'
links:
- kafka
- zookeeper
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=medium_debezium
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- CONFIG_STORAGE_REPLICATION_FACTOR=1
- OFFSET_STORAGE_REPLICATION_FACTOR=1
- STATUS_STORAGE_REPLICATION_FACTOR=1

Kafka connect variables glossary:

  • BOOTSTRAP_SERVERS: URL to connect to the Kafka cluster.
  • GROUP_ID: Unique string to indentify the connect cluster group this worker belongs to.
  • CONFIG_STORAGE_TOPIC, OFFSET_STORAGE_TOPIC, STATUS_STORAGE_TOPIC: The name of the topic where connector and task configuration data are stored. This must be the same for all workers with the same group_id.
  • CONFIG_STORAGE_REPLICATION, OFFSET_STORAGE_REPLICATION, OFFSET_STORAGE_REPLICATION: The replication factor used when Kafka Connects creates the topic used to store connector and task configuration data. This should always be at least 1 for a production system, but cannot be larger than the number of Kafka brokers in the cluster, in this case we are using 1 because of the standalone cluster.

For more Kafka connects variables use this link.

Connector sender:

As I told before we need to use ksqlDB or a REST api to send the connector configuration to Kafka, because of that I use a “workaround” to execute a curl sending the file to the REST API.

connector-sender:
container_name: connector-sender
image: confluentinc/cp-kafka-connect:5.5.3
volumes:
- ./conf:/conf
depends_on:
- kafka-connect
command:
- bash
- -c
- |
echo "Wainting for kafka connect to start..."
until [[ "$$(curl -s -o /dev/null -w %{http_code} kafka-connect:8083/connectors)" -eq 200 ]]; do
sleep 1
done
echo -e "Sending connector"
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" kafka-connect:8083/connectors/ -d @/conf/mysql_config.json

Final docker-compose file:

version: '2'
services:
mysql:
image: debezium/example-mysql:1.8
container_name: mysql
hostname: mysql
ports:
- '3306:3306'
environment:
MYSQL_ROOT_PASSWORD: debezium
MYSQL_USER: mysqluser
MYSQL_PASSWORD: mysqlpw

zookeeper:
image: confluentinc/cp-zookeeper:5.5.3
container_name: zookeerper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
ports:
- '2181:2181'

kafka:
image: confluentinc/cp-enterprise-kafka:5.5.3
container_name: kafka
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
links:
- zookeeper
ports:
- '9092:9092'

schema-registry:
container_name: schema-registry
image: confluentinc/cp-schema-registry:4.0.3
environment:
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081
ports:
- '8081:8081'

kafka-connect:
container_name: kafka-connect
image: debezium/connect:1.8
ports:
- '8083:8083'
links:
- kafka
- zookeeper
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=medium_debezium
- CONFIG_STORAGE_TOPIC=my_connect_configs
- CONFIG_STORAGE_REPLICATION_FACTOR=1
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- OFFSET_STORAGE_REPLICATION_FACTOR=1
- STATUS_STORAGE_TOPIC=my_connect_statuses
- STATUS_STORAGE_REPLICATION_FACTOR=1
- REST_ADVERTISED_HOST_NAME=medium_debezium

Database configuration file

Now that the docker-compose file is already build the last step is to configure our database file, luckily the name of the variables are pretty suggestive so I will only mention the variables that has important value changes.

{
"name": "medium_debezium",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "debezium",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.include.list": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"table.include.list": "inventory.customers, inventory.orders",
"column.exclude.list": "inventory.customers.email",
"snapshot.mode": "initial",
"snapshot.locking.mode": "none"
}
}

You can access a more detailed list of configurations here.

Important observations:

  • The parameters table.include.list and column.exclude.list are optional parameters that I leave to show how to select specific columns and remove columns that won’t be useful like passwords, sensitive information, like others. If you don’t set this parameter the connector will get all tables and columns.
  • snapshot.mode has the default value in the file but I left it there because it must be useful to know it exists but snapshot.locking.mode is a critical value in my opinion because it set how it will lock your database. You can see more about snapshots configurations here.

Running the project

To run the project you only need to have docker installed, go to the folder and run:

docker-compose up -d

If everything went well we have five containers up and running, remember that the connector-sender will send the connection file and stop the container.

Containers up and running

You can use send_connector.sh to execute the curl to send the database config file to kafka connector.

If you want to navigate through Kafka you can use this as reference, in this post I will only check if the topics have been created.

First, entering Kafka’s container:

docker exec -it container_id bash

Now, inside the container run to list all topics:

kafka-topics --bootstrap-server=localhost:9092 --list

After this we can see these topics:

Topics listed by Kafka-topics command

You can see that we have internal and “external” topics mixed, the two topics with the tables we set in the config file are dbserver1.inventory.customers and dbserver1.inventory.orders . If you want to see the messages you cant use the command bellow:

kafka-console-consumer --bootstrap-server localhost:9092 --topic dbserver1.inventory.customers --from-beginning

Debezium change structure

This part of the post I will only show how is the change file that Debezium builds, inside the repo I left three examples inside the folder files , but I will leave a snippet here too.

The structure has two main parts schema and payload, schema is the data structure of the table and payload is the information about the data itself like how it was before the change, how it become after the change and the source of the data. I will focus on the payload here but you can see everything in the repository.

Insert example:

In this case I ran an insert in the table customers, you can run any query accessing the mysql container.

"payload":
{
"before": null,
"after":
{
"id": 9999,
"first_name": "matheus",
"last_name": "tramontini"
},
"source":
{
"version": "1.8.1.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1647491136000,
"snapshot": "false",
"db": "inventory",
"sequence": null,
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 791,
"row": 0,
"thread": null,
"query": null
},
"op": "c",
"ts_ms": 1647491136579,
"transaction": null
}

When an insert is executed we have a before with a null value, it’s very intuitive because the data didn’t exist before and the after is the values of the new row.

Update example:

All the names in the table have a capital letter in first_name and last_name, so lets fix my mistake in the insert and change the names with a capital letter.

"payload":
{
"before":
{
"id": 9999,
"first_name": "matheus",
"last_name": "tramontini"
},
"after":
{
"id": 9999,
"first_name": "Matheus",
"last_name": "Tramontini"
},
"source":
{
"version": "1.8.1.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1647491205000,
"snapshot": "false",
"db": "inventory",
"sequence": null,
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 1152,
"row": 0,
"thread": null,
"query": null
},
"op": "u",
"ts_ms": 1647491205582,
"transaction": null
}

This time we have a before because we already had the data inside the table, we only changed its values.

Delete example:

To end this case we will get rid of the information I insert earlier, after this our payload will be this way:

"payload":
{
"before":
{
"id": 9999,
"first_name": "Matheus",
"last_name": "Tramontini"
},
"after": null,
"source":
{
"version": "1.8.1.Final",
"connector": "mysql",
"name": "dbserver1",
"ts_ms": 1647491261000,
"snapshot": "false",
"db": "inventory",
"sequence": null,
"table": "customers",
"server_id": 223344,
"gtid": null,
"file": "mysql-bin.000003",
"pos": 1556,
"row": 0,
"thread": null,
"query": null
},
"op": "d",
"ts_ms": 1647491261810,
"transaction": null
}

This case is the inverse of the insert process, we will have an after with a null value now.

That’s all folks

I hope that this post help you to speed up the kafka study process, I intend to write more about CDC data pipes like Kafka integration with GCS, use Trino + Hive + Metabase to get this data from Kafka and visualize it. If you have any question you can open an issue inside the repository and I can try to help you! :)

Link to the repository

--

--