Change Data Capture (CDC) with Kafka Connect

Handoko Darmawan
Life at Telkomsel
Published in
6 min readJun 25, 2024
Image taken from confluent’s learn-resources: From Zero to Hero with Kafka Connect

Introduction

Change Data Capture (CDC) is a set of software design patterns used to detect and capture changes made to data in a database and subsequently deliver those changes in real-time to various downstream systems. CDC is essential for keeping data synchronized across multiple systems and ensuring data consistency. One of the popular tools for implementing CDC is Apache Kafka, specifically using Kafka Connect.

Pros of Using CDC Over Traditional ETL

Traditional Extract, Transform, Load (ETL) processes involve periodic bulk data transfers from source systems to target systems. While ETL has been the backbone of data integration for many years, CDC offers several advantages over traditional ETL:

  • Real-Time Data Processing: Unlike traditional ETL, which typically operates in batch mode, CDC captures and streams data changes in real-time. This ensures that downstream systems receive updates as soon as they occur, enabling more timely and accurate data processing.
  • Reduced Latency: With CDC, there is minimal lag between the time a change occurs in the source database and the time it is available in the target system. This low latency is crucial for applications that require up-to-date information, such as real-time analytics and monitoring.
  • Efficiency: CDC captures only the changes made to the data, rather than transferring entire datasets. This reduces the volume of data being moved, resulting in lower network and processing overhead compared to traditional ETL processes.
  • Data Consistency: By continuously monitoring and capturing changes, CDC helps maintain data consistency between source and target systems. Traditional ETL processes can introduce inconsistencies due to the time gap between data extractions.
  • Scalability: CDC is inherently more scalable than traditional ETL. It can handle high-velocity data streams and large volumes of data without requiring extensive resources or causing significant performance degradation.
  • Event-Driven Architecture: CDC integrates seamlessly with event-driven architectures, where database changes are treated as events that can trigger downstream actions. This enables more responsive and flexible system designs.
  • Minimal Impact on Source Systems: Since CDC captures changes incrementally, it imposes less load on source systems compared to traditional ETL, which often involves heavy read operations on the entire dataset.

What is Kafka Connect?

Kafka Connect is a robust and scalable tool for streaming data between Apache Kafka and other systems. It is part of the Apache Kafka ecosystem and provides a framework to move large collections of data in and out of Kafka. Kafka Connect simplifies the integration of various data sources and sinks with Kafka, making it easier to build real-time data pipelines.
Image taken from confluent’s learn-resources: From Zero to Hero with Kafka Connect

Image taken from confluent’s learn-resources: From Zero to Hero with Kafka Connect

Key Concepts in Kafka Connect

Before diving into CDC with Kafka Connect, it’s important to understand some key concepts:

  • Connectors: These are plugins that enable Kafka Connect to interact with external systems. There are two types of connectors: source connectors (which import data into Kafka) and sink connectors (which export data from Kafka).
  • Tasks: Tasks are the actual units of work that perform the data transfer. A connector can be configured to run multiple tasks for parallel processing.
  • Workers: Workers are the JVM processes that execute the connectors and tasks. Kafka Connect can run in standalone mode (single worker) or distributed mode (multiple workers).
  • Transforms: Simple data transformations that can be applied to the data as it passes through Kafka Connect.
  • Converters: These handle the serialization and deserialization of data, enabling Kafka Connect to read and write data in various formats such as JSON, Avro, or Protobuf.

Implementing CDC system for MySQL database and sink to MongoDB

CDC involves capturing changes (inserts, updates, deletes) from a database and streaming those changes to Kafka. In this article we will build CDC system using debezium source connector and mongo sink connector as shown in picture below.

CDC system with Kafka Connect using MySql as source and MongoDb as sink

Step 1: Setup MySql so that kafka connect can tap into bin log for cdc

Debezium uses MySQL’s bin-log to track all the events, hence the bin-log should be enabled in MySql. Here are the steps to enable bin-log in Debezium MySQL.

A. Check the current state of bin-log activation:

$ mysqladmin variables -uroot|grep log_bin
| log_bin | OFF

B. Enable the bin-log by modifying config file of my.cnf as below

[mysqld]
server-id = 42
log_bin = mysql-bin
binlog_format = row

C. Restart MySQL and Verify the bin-log status via the following command.

$ mysqladmin variables -uroot|grep log_bin
| log_bin | ON

Step 2: Set Up Mongo replica set, as kafka connect can only be used with mongodb cluster

link to setup replicaset of a single mongo container

Step 3: Set Up Kafka and Kafka Connect

You need to have a running Kafka cluster and Kafka Connect. You can use tools like Confluent Platform or Docker to quickly set up Kafka and Kafka Connect. Connectors need to be installed in Kafka Connect. This can be done by downloading the connector plugins and placing them in the Kafka Connect plugins directory.

debezium-source-connector

mongo-sink-connector

Step-by-step installation refer to this and this

Step 4: Configure the Connector

Configure the Debezium source connector and mongo sink connector to connect to databases. Kafka connect expose REST api for configuration setting. Below is curl command for configuring debezium source connector

curl - location 'localhost:8083/connectors/' \
- header 'Accept: application/json' \
- header 'Content-Type: application/json' \
- data '{
"name": "hcis-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql_cdc",
"database.port": "3306",
"database.user": "your-database-user",
"database.password": "your-database-password",
"database.server.id": "777",
"topic.prefix": "hcis_cdc",
"database.include.list": "hcis",
"schema.history.internal.kafka.bootstrap.servers": "kafkabroker0:9092",
"schema.history.internal.kafka.topic": "schemahistory.hcis",
"include.schema.changes": "false"
}
}'

And below is curl command for configuring mongo sink connector

curl - location 'http://localhost:8083/connectors' \
- header 'Content-Type: application/json' \
- data '{
"name": "new-hcis-sink-connector",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max": "1",
"connection.uri": "mongodb://mongodb1:27017/cdc_from_dbz",
"topics": "hcis_cdc.hcis.faq",
"database": "cdc_from_dbz",
"collection": "hcis.faq",
"max.num.retries": 5,
"errors.tolerance": "all",
"mongo.errors.tolerance": "all",
"mongo.errors.log.enable": true,
"errors.log.include.messages": true,
"errors.deadletterqueue.topic.name": "hcis.faq.deadletter",
"errors.deadletterqueue.context.headers.enable": true,
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.mysql.MysqlHandler"
}
}'

Step 5: Ensure connector is configured and running

curl - location 'http://localhost:8083/connectors'
List of connectors active

Step 6: Perform CDC

A. Perform DML operation on the tables on the MysSql database that already set to be captured.

faq-table to be captured by CDC mechanism

B. Check the corresponding topic for the CDC

Note the change state status as defined in key “before” and “after”. Also the key “op” with value “u” indicates operation performed, which in this case is update operation.

example of message emitted by the connector to kafka topic

C. Ensure that sink connector perform as it should by checking document’s collections in MongoDb that reflect changes made in the MySql tables.

Data in mongodb that reflect changes in MySql table

Conclusion

Kafka Connect is a robust framework that simplifies the integration of various data sources and sinks with Apache Kafka. Its architecture ensures scalability, fault tolerance, and ease of use, making it an essential tool for building real-time data pipelines. By leveraging Kafka Connect, organizations can streamline their data integration processes, ensuring seamless data flow across diverse environments.

References

[1] https://kafka.apache.org/documentation/#connect

[2] https://debezium.io/documentation/reference/2.6/

[3] https://www.mongodb.com/docs/kafka-connector/current/

[4] https://www.confluent.io/online-talks/from-zero-to-hero-with-kafka-connect-on-demand/

--

--