How to do CDC with Debezium and Kafka
Why CDC
An event-driven microservice is one pattern of microservice that is always recommended by the expert. Change Data Capture(CDC) is one approach that can be used to form an event-driven microservice. CDC works by capturing any changes in the database and pushing those changes to an event log, for example, Kafka. Then any service or tools that need these data changes can subscribe to the event log.
When my team has given the task of decomposing monolith to microservice, we stumbled upon a problem that could be solved by CDC. What we try to solve with CDC is a classic problem of microservices, which is data spread across microservices. This problem makes creating basic features like search, sort and filter become more painful than when we are still using monolith. Although, there is another approach like using database replication. But CDC has more future-proof solutions, which I will explain in the next section.
With CDC, we can capture database change into an event and store it in a topic in Kafka. This topic can be consumed by any service to sink the event log into its database. Thus, creating features like search, sort and filter become easier. But the consumer service still needs to be mindful of which column data to sink because the reckless storing of data in the sink database can cause an increase in cost.
Advantages and Disadvantages of CDC with Debezium and Kafka
As I mentioned before, we are using Kafka and Debezium for our CDC tech stack. So the data captured by Debezium will be stored in Kafka. Because Kafka provides log compaction, any redundant event will be deleted when the retention policy expires. So only the last event will remain on Kafka, which provides more cost-saving because we don’t store any useless events.
So here is the advantage:
- Low latency and overhead
- The versatility of CDC Integration, for example, data warehouse
Disadvantage:
- Storing database changes in the event log required a lot of disk space
- Maintenance Effort
Demonstration
First, we need to have Zookeeper, Kafka, and PostgreSQL running before turning on the Debezium connector. For visualization, we will use Kafka-UI to confirm whether database changes were captured or not.
version: “3.7”
services:
postgres:
image: debezium/postgres:13
ports:
— 5432:5432
environment:
— POSTGRES_USER=docker
— POSTGRES_PASSWORD=docker
— POSTGRES_DB=profile_service_dev zookeeper:
image: confluentinc/cp-zookeeper:5.5.3
environment:
ZOOKEEPER_CLIENT_PORT: 2181 kafka:
image: confluentinc/cp-enterprise-kafka:5.5.3
depends_on: [zookeeper]
environment:
KAFKA_ZOOKEEPER_CONNECT: “zookeeper:2181”
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9991
ports:
— 9092:9092 kafka-ui:
container_name: kafka-ui
image: provectuslabs/kafka-ui:latest
ports:
— 8080:8080
depends_on:
— zookeeper
— kafka
— schema-registry
environment:
KAFKA_CLUSTERS_0_NAME: local
KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
KAFKA_CLUSTERS_0_ZOOKEEPER: zookeeper:2181
KAFKA_CLUSTERS_0_JMXPORT: 9997
KAFKA_CLUSTERS_0_SCHEMAREGISTRY: http://schema-registry:8081 debezium:
image: debezium/connect:1.4
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
depends_on: [kafka]
ports:
— 8083:8083 schema-registry:
image: confluentinc/cp-schema-registry:5.5.3
environment:
— SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
— SCHEMA_REGISTRY_HOST_NAME=schema-registry
— SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081,http://localhost:8081
ports:
— 8081:8081
depends_on: [zookeeper, kafka]
Save the script above as docker-compose.yaml, then run it in the background.
docker-compose up -d
Enter PostgreSQL container with the command below
docker exec -it {replace with container name} psql -d profile_service_dev -U docker
Create a new Table in your PostgreSQL DB if you don’t have any table. In this demo, I will use the table example below.
psql (13.7 (Debian 13.7-1.pgdg110+1))Type "help" for help.
profile_service_dev=# CREATE TABLE WAREHOUSE(
ID INT PRIMARY KEY NOT NULL,
NAME TEXT NOT NULL,
ADDRESS TEXT,
CITY CHAR(50)
);CREATE TABLE
profile_service_dev=#
profile_service_dev=# SELECT * FROM warehouse;
id | name | address | city----+------+---------+------(0 rows)
profile_service_dev=#
Create a debezium connector configuration as a connector.json. Please make sure the database configuration is correct to ensure the connection is successful.
name is an identifier of your debezium config connector.
plugin.name is the logical decoding plugin that we use.
snapshot.mode is a config for the behavior of connector when taking snapshot schema of a database.
table.include.list is a config to tell the connector which table we will capture its changes. It can contain multiple tables.
{
"name": "warehouse-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "pgoutput",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "docker",
"database.password": "docker",
"database.dbname": "profile_service_dev",
"database.server.name": "postgres",
"snapshot.mode": "always",
"table.include.list": "public.warehouse"
}
}
Then, send your configuration to debezium connector, so a new connector will be created.
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" 127.0.0.1:8083/connectors/ --data "@connector.json"
Now if the connector is created you will receive status code 201. Debezium will start capturing data and sending it as a message to the Kafka Topic.
You can use kafka-ui to see data on topics that the CDC published. Open kafka-ui at http://locahost:8080. Go to the topics menu and check if posgres.public.warehouse exists. This topic will store all database changes which Debezium captures.
Now, start inserting data in the table warehouse by executing insert command.
profile_service_dev=# INSERT INTO warehouse (id, name, address, city) VALUES (1, ‘warehouse dummy’, ‘Random Street’, ‘Random City’);
INSERT 0 1
Next, check kafka-ui again to check if changes captured. Go to Topics > posgres.public.warehouse > Messsages. The result will look like picture below.
You can also execute the SQL update command in the warehouse table. The changes will also be captured in the Topic.
profile_service_dev=# UPDATE warehouse SET name=’Warehouse Empty’ WHERE id=1;
UPDATE 1
profile_service_dev=#
Summary
In summary, you should use the CDC approach if you have a problem sourcing or aggregating data from multiple microservices. With the Kafka ecosystem already in your tech stack, it becomes easier since you can use Debezium, a powerful CDC tool. Debezium is a very powerful CDC tool with many features already implemented, like incremental snapshots for capturing any DDL change like CREATE and ALTER in your Database, automatically changing the structure of the message. Debezium also can transform the event captured with single message transformation (SMTs) to remove unnecessary data. And many more features of Debezium will remove many pain points for using the CDC approach for your microservices.
Remember, don’t reinvent the wheel.
Thank you for reading this article. Cheers.