Change Data Capture (CDC) with Kafka Connect
Introduction
Change Data Capture (CDC) is a set of software design patterns used to detect and capture changes made to data in a database and subsequently deliver those changes in real-time to various downstream systems. CDC is essential for keeping data synchronized across multiple systems and ensuring data consistency. One of the popular tools for implementing CDC is Apache Kafka, specifically using Kafka Connect.
Pros of Using CDC Over Traditional ETL
Traditional Extract, Transform, Load (ETL) processes involve periodic bulk data transfers from source systems to target systems. While ETL has been the backbone of data integration for many years, CDC offers several advantages over traditional ETL:
- Real-Time Data Processing: Unlike traditional ETL, which typically operates in batch mode, CDC captures and streams data changes in real-time. This ensures that downstream systems receive updates as soon as they occur, enabling more timely and accurate data processing.
- Reduced Latency: With CDC, there is minimal lag between the time a change occurs in the source database and the time it is available in the target system. This low latency is crucial for applications that require up-to-date information, such as real-time analytics and monitoring.
- Efficiency: CDC captures only the changes made to the data, rather than transferring entire datasets. This reduces the volume of data being moved, resulting in lower network and processing overhead compared to traditional ETL processes.
- Data Consistency: By continuously monitoring and capturing changes, CDC helps maintain data consistency between source and target systems. Traditional ETL processes can introduce inconsistencies due to the time gap between data extractions.
- Scalability: CDC is inherently more scalable than traditional ETL. It can handle high-velocity data streams and large volumes of data without requiring extensive resources or causing significant performance degradation.
- Event-Driven Architecture: CDC integrates seamlessly with event-driven architectures, where database changes are treated as events that can trigger downstream actions. This enables more responsive and flexible system designs.
- Minimal Impact on Source Systems: Since CDC captures changes incrementally, it imposes less load on source systems compared to traditional ETL, which often involves heavy read operations on the entire dataset.
What is Kafka Connect?
Kafka Connect is a robust and scalable tool for streaming data between Apache Kafka and other systems. It is part of the Apache Kafka ecosystem and provides a framework to move large collections of data in and out of Kafka. Kafka Connect simplifies the integration of various data sources and sinks with Kafka, making it easier to build real-time data pipelines.
Image taken from confluent’s learn-resources: From Zero to Hero with Kafka Connect
Key Concepts in Kafka Connect
Before diving into CDC with Kafka Connect, it’s important to understand some key concepts:
- Connectors: These are plugins that enable Kafka Connect to interact with external systems. There are two types of connectors: source connectors (which import data into Kafka) and sink connectors (which export data from Kafka).
- Tasks: Tasks are the actual units of work that perform the data transfer. A connector can be configured to run multiple tasks for parallel processing.
- Workers: Workers are the JVM processes that execute the connectors and tasks. Kafka Connect can run in standalone mode (single worker) or distributed mode (multiple workers).
- Transforms: Simple data transformations that can be applied to the data as it passes through Kafka Connect.
- Converters: These handle the serialization and deserialization of data, enabling Kafka Connect to read and write data in various formats such as JSON, Avro, or Protobuf.
Implementing CDC system for MySQL database and sink to MongoDB
CDC involves capturing changes (inserts, updates, deletes) from a database and streaming those changes to Kafka. In this article we will build CDC system using debezium source connector and mongo sink connector as shown in picture below.
Step 1: Setup MySql so that kafka connect can tap into bin log for cdc
Debezium uses MySQL’s bin-log to track all the events, hence the bin-log should be enabled in MySql. Here are the steps to enable bin-log in Debezium MySQL.
A. Check the current state of bin-log activation:
$ mysqladmin variables -uroot|grep log_bin
| log_bin | OFF
B. Enable the bin-log by modifying config file of my.cnf as below
[mysqld]
server-id = 42
log_bin = mysql-bin
binlog_format = row
C. Restart MySQL and Verify the bin-log status via the following command.
$ mysqladmin variables -uroot|grep log_bin
| log_bin | ON
Step 2: Set Up Mongo replica set, as kafka connect can only be used with mongodb cluster
link to setup replicaset of a single mongo container
Step 3: Set Up Kafka and Kafka Connect
You need to have a running Kafka cluster and Kafka Connect. You can use tools like Confluent Platform or Docker to quickly set up Kafka and Kafka Connect. Connectors need to be installed in Kafka Connect. This can be done by downloading the connector plugins and placing them in the Kafka Connect plugins directory.
Step-by-step installation refer to this and this
Step 4: Configure the Connector
Configure the Debezium source connector and mongo sink connector to connect to databases. Kafka connect expose REST api for configuration setting. Below is curl command for configuring debezium source connector
curl - location 'localhost:8083/connectors/' \
- header 'Accept: application/json' \
- header 'Content-Type: application/json' \
- data '{
"name": "hcis-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql_cdc",
"database.port": "3306",
"database.user": "your-database-user",
"database.password": "your-database-password",
"database.server.id": "777",
"topic.prefix": "hcis_cdc",
"database.include.list": "hcis",
"schema.history.internal.kafka.bootstrap.servers": "kafkabroker0:9092",
"schema.history.internal.kafka.topic": "schemahistory.hcis",
"include.schema.changes": "false"
}
}'
And below is curl command for configuring mongo sink connector
curl - location 'http://localhost:8083/connectors' \
- header 'Content-Type: application/json' \
- data '{
"name": "new-hcis-sink-connector",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"tasks.max": "1",
"connection.uri": "mongodb://mongodb1:27017/cdc_from_dbz",
"topics": "hcis_cdc.hcis.faq",
"database": "cdc_from_dbz",
"collection": "hcis.faq",
"max.num.retries": 5,
"errors.tolerance": "all",
"mongo.errors.tolerance": "all",
"mongo.errors.log.enable": true,
"errors.log.include.messages": true,
"errors.deadletterqueue.topic.name": "hcis.faq.deadletter",
"errors.deadletterqueue.context.headers.enable": true,
"change.data.capture.handler": "com.mongodb.kafka.connect.sink.cdc.debezium.rdbms.mysql.MysqlHandler"
}
}'
Step 5: Ensure connector is configured and running
curl - location 'http://localhost:8083/connectors'
Step 6: Perform CDC
A. Perform DML operation on the tables on the MysSql database that already set to be captured.
B. Check the corresponding topic for the CDC
Note the change state status as defined in key “before” and “after”. Also the key “op” with value “u” indicates operation performed, which in this case is update operation.
C. Ensure that sink connector perform as it should by checking document’s collections in MongoDb that reflect changes made in the MySql tables.
Conclusion
Kafka Connect is a robust framework that simplifies the integration of various data sources and sinks with Apache Kafka. Its architecture ensures scalability, fault tolerance, and ease of use, making it an essential tool for building real-time data pipelines. By leveraging Kafka Connect, organizations can streamline their data integration processes, ensuring seamless data flow across diverse environments.
References
[1] https://kafka.apache.org/documentation/#connect
[2] https://debezium.io/documentation/reference/2.6/
[3] https://www.mongodb.com/docs/kafka-connector/current/
[4] https://www.confluent.io/online-talks/from-zero-to-hero-with-kafka-connect-on-demand/