Real-time SQL Server CDC changes to Mysql using Debezium, Kafka Connect without Docker

Venkata Harish Gollavilli
2 min readApr 10, 2019

--

Data platforms in any enterprise have use cases involving Change Data Capture. Gone are those days when the CDC processes would run as batch ETL jobs once a day. Data engineering solutions today require CDC changes to be propagated in real-time.

There are many commercial CDC tools in the market that can do the job. But if you are looking for an open source solution without any commercial software, it can be achieved as shown below using Debezium + Kafka Connect which works like a charm. This example is implemented without docker.

Debezium

Open source CDC service offered by Red-Hat which captures the database changes in real time and pushes them to Kafka topics using Kafka Connect. I am impressed by the way Debezium works. For complete details, please check https://debezium.io/

Replicate SQL Server CDC changes to MySql in real-time

Using Red Hat Enterprise Linux Server 7.5 with Java 8. Download JDK if java not installed.

  1. Download Kafka 2.1.1 from the below link and extract https://kafka.apache.org/downloads
cd /opt
tar -xvzf kafka_2.12-2.1.1.tgz

2. Download and extract latest Debezium SQL Server plugins from the below link https://debezium.io/docs/install/

cd /usr/local/share/kafka/plugins 
tar -xvzf debezium-connector-sqlserver-0.9.0.Alpha1-plugin.tar.gz

3. Add the below jars to CLASSPATH

export CLASSPATH=$CLASSPATH:/usr/local/share/kafka/plugins/debezium-connector-sqlserver/*
export CLASSPATH=$CLASSPATH:/opt/kafka_2.12-2.1.1/libs/*

4. Start Zookeeper

cd /opt/kafka_2.11-0.9.0.0
bin/zookeeper-server-start.sh config/zookeeper.properties

5. Start Kafka

cd /opt/kafka_2.11-0.9.0.0
bin/kafka-server-start.sh config/server.properties

6. Configure Kafka Connect

Kafka Connect can be configured as a REST API and connectors can be created by a POST request to this API. But in this example, source and sink connectors are created in the standalone command itself as below. You may use either way based on your requirement.

This requires 3 files to configure and run connectors that stream the database changes from SQL Server database to MySQL. Update all the properties files with values such as hosts, users, passwords etc.

worker.properties

offset.storage.file.filename=/tmp/connect.offsets
bootstrap.servers=<hostname>:9092
offset.flush.interval.ms=10000
rest.port=10082
rest.host.name=<hostname>
rest.advertised.port=10082
rest.advertised.host.name=<hostname>
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
plugin.path=/usr/local/share/kafka/plugins

sqlserver.properties

name=sqlservercon
connector.class=io.debezium.connector.sqlserver.SqlServerConnector
database.hostname=<sqlserver_host>
database.port=<sqlserver_port>
database.user=<sqlserver_user>
database.password=<sqlserver_password>
database.dbname=<sqlserver_database_name>
database.server.name=<assign_any_name>
table.whitelist=<tablename>
database.history.kafka.bootstrap.servers=<hostname>:9092
database.history.kafka.topic=oldhisotry_tpc
transforms=route
transforms.route.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.route.regex=([^.]+)\\.([^.]+)\\.([^.]+)
transforms.route.replacement=$3
topic.prefix=cdc_topic
database.names=<sqlserver_database_name>

sink.properties

name=jdbc-sink
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
topics=<sqlserver_whitelist_table_name>
connection.url=jdbc:mysql://<mysql_host>:3306/<schema>?user=<username>&password=<password>
transforms=unwrap
transforms.unwrap.type=io.debezium.transforms.UnwrapFromEnvelope
auto.create=true
insert.mode=upsert
pk.fields=<primary_key_column>
pk.mode=record_value
table.name.format=<mysql_schema>.<table_name>

Start Kafka in standalone mode as below

$KAFKA_HOME/bin/connect-standalone worker.properties sqlserver.properties sink.properties

Make updates to the SQL Server table, you will notice the changes being replicated into mysql table in real time.

--

--