Change Data Capture Magic: Streaming with Debezium, Kafka, and Docker

MEHMET ARİF EMRE ŞEN
Yazilim VIP
Published in
5 min readNov 18, 2023

Introduction

In software engineering, seamless data synchronization across platforms is paramount, and Change Data Capture (CDC) stands out as a solution. While Debezium, Kafka, and Docker are powerful in their own right, orchestrating them together can be intricate. This guide delves into the specifics of configuration and integration, ensuring you can harness the full potential of CDC. Ready to step up your CDC game? Dive in!

Step 1: Create a Container with Docker-Compose

Kick-start your journey by launching the necessary services using Docker. Your docker-compose.yml should establish PostgreSQL, Kafka, Zookeeper, and Debezium Connect services.

The volume binding for the database container is solely used to pre-initialize the database with a table. Refer to the ‘init-scripts’ folder in my GithubRepo for additional details.
See my GitHub repo: init-scripts

The wal_level=logical configuration in PostgreSQL is pivotal for enabling logical decoding, a requisite for Change Data Capture (CDC) with Debezium.
See official documentation: Debezium Setting Up PostgreSQL

version: '3.8'
services:
database:
image: postgres:14.1-alpine
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=db_local
ports:
- 5432:5432
volumes:
- ./init-scripts:/docker-entrypoint-initdb.d
command: [ "postgres", "-c", "wal_level=logical" ]
zookeeper:
image: zookeeper:3.9.0
kafka:
image: docker.io/bitnami/kafka:3.4
depends_on:
- zookeeper
environment:
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
debezium-connect:
image: debezium/connect:2.4.0.Final
links:
- kafka
- database
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=debezium-events
- CONFIG_STORAGE_TOPIC=debezium_configs
- OFFSET_STORAGE_TOPIC=debezium_offsets
- STATUS_STORAGE_TOPIC=debezium_statuses
ports:
- 8083:8083

Step 2: Connector Management Using Debezium’s API

In this phase, we’ll establish a solid bridge between our database and Kafka by precisely setting up and validating Debezium connectors.

A. Establishing the Debezium Connector

These connectors act as agents that sync your database changes to Kafka topics.

Creating connectors using REST API
To create the connector, invoke a POST request:

debezium-connector-config.json

{
"name": "postgresql-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"connector.displayName": "PostgreSQL",
"topic.prefix": "pg-changes",
"database.user": "postgres",
"database.dbname": "db_local",
"table.exclude.list": "audit",
"database.hostname": "database",
"database.password": "postgres",
"name": "postgresql-connector",
"connector.id": "postgres",
"plugin.name": "pgoutput"
}
}

Create Connector request via curl

curl http://localhost:8083/connectors \
-H 'Content-Type: application/json' \
-X POST \
-d '@debezium-connector-config.json'

Critical Elements in the Configuration
For this guide, we’ve outlined a reference configuration in debezium-connector-config.json Key attributes include:

  • Name: A unique identifier for the connector.
  • Connector.class: The Debezium connector type.
  • Database: Parameters like hostname, user, password, and dbname to authenticate and locate the database.
  • Table: Directives like include.list or exclude.list to filter which tables to monitor.

See official API Documentation: Connector Create

B. Health Check for the Connector

Post-deployment it’s a software engineering best practice to ensure our connectors are up and functioning as intended.

Fetching Connector Status
Invoke the following command to retrieve the status

# Command Template 
# curl -H "Accept:application/json" localhost:8083/connectors/[Your-Connector-Name]/status
# Example
curl -H "Accept:application/json" localhost:8083/connectors/postgresql-connector/status

A typical operational response will look like:

{
"name": "inventory-connector",
"connector": {
"state": "RUNNING",
"worker_id": "192.168.1.8:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "192.168.1.8:8083"
}
],
"type": "source"
}

The key is to look for "state": "RUNNING" under both connector and tasks, indicating optimal performance.

See official API Documentation: Check Connector Status

With these steps, we’ve ensured our data flow from the database to Kafka is healthy.

Step 3: Verifying Topic Creation and Streaming in Kafka

In this step, we aim to check if the topics are created in Kafka and whether the database changes are streamed into them.

see Kafka Topic Management if you are not familiar with Kafka scripts

Important Note

Ensure to make data manipulations (INSERT, UPDATE, DELETE) in your database after setting up Debezium and while checking Kafka topics. Without these activities, there will be no change events to capture and stream into Kafka

A. Verify Topic Creation:

Execute the following command to list all available topics in Kafka. Be sure to replace [Kafka Container ID] with the actual ID of your Kafka container.

docker exec -it [Kafka Container ID] /opt/bitnami/kafka/bin/kafka-topics.sh \
--list \
--bootstrap-server localhost:9092
Example output

B. Data Activity for Streaming Verification:

There should be database activities to observe the data change events in Kafka topics. Let’s insert, update, or delete some records in the database, thereby triggering Debezium to capture these changes and send them to Kafka topics.

Execute the following command to consume messages from a Kafka topic.

docker exec -it [Kafka Container ID] /opt/bitnami/kafka/bin/kafka-console-consumer.sh \
--topic [Your-Topic-Name] \
--bootstrap-server localhost:9092 \
--from-beginning

Replace [Your-Topic-Name] with the relevant topic name. Adding --from-beginning will fetch all messages from the start of the log; omit to bring only subsequent messages.

Example Consumed Message

A Typical message be like:

{
"schema": {
// ....
},
"payload": {
"before": null,
"after": {
"id": 1,
"first_name": "Emre",
"last_name": "Sen"
},
"source": {
"name": "pg-changes", // connector name
"ts_ms": 1697351918939, // timestamp of the transaction
"db": "db_local",
"schema": "public",
"table": "person",
"txId": 739, // transaction id
"lsn": 24277320, // log sequence number
// ...
},
"op": "c", // operation type
}
}

Check Debezium PostgreSQL Connector

Points to Note:

  • Understanding Message Output: Your console will stream messages, each representing a change in your database. They encode details like the operation type, source information, and the state of the changed record.

Troubleshooting

  • No Topics Listed: Ensure your connector is running and verify database changes are triggering events.
  • No Messages: Check for potential network issues or misconfigurations in Debezium connector settings.

Upcoming next…

So, you’ve ventured into the world of Change Data Capture with Debezium, Kafka, and Docker, and you’re feeling confident. But wait! Need some clarity on configuring Spring Boot for Kafka consumption? Or maybe a quick primer on managing Debezium connectors via a Spring Boot REST API? Don’t fret; the upcoming articles are designed to be your beacon. Stay tuned!

--

--