Streaming SQL Server CDC with Apache Kafka using Debezium.

Ganesh Ankulwar
6 min readJan 12, 2020

--

Overview:

Gone are those days of batch processing for capturing the changes in the Databases. This is the new era of Data Engineering, where the changes are captured in real-time. Nowadays, every RDBMS database is coming with CDC(Change Data Capture support) we can leverage these and create a data pipeline to fetch the real-time changes in the databases.

There are mainly two platform/frameworks that are used to create streaming data pipeline-

  1. Kinesis- AWS Managed service.
  2. Kafka- Open Source and most widely used. Recently, AWS also started managed Kafka service by the name AWS MSK

Kinesis:

By using AWS DMS(Data MigrationService) and Kinesis one can create a real-time data ingestion pipeline to stream CDC events from a database. If you are interested in creating one then, check out this article.

Kafka:

Using Kafka there are many commercial CDC tools are available in the market which can do the job below is a list of Commercial CDC tools:

So, now you know the two platforms, you can choose one according to your use case.

Based on my short research, I was able to find only one open-source tool/way to stream CDC events data into Kafka and that was using Debezium connector with Kafka Connect API.

What is Debezium?

It is an open-source project, which offers various plugins to fetch the data from a database. It captures row-level changes in your databases so that your applications can see and respond to those changes. Debezium records in a transaction log all row-level changes committed to each database table.

What is Kafka Connect?

Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other data systems. It makes it simple to quickly define connectors that move large data sets into and out of Kafka. Kafka Connect can ingest entire databases or collect metrics from all your application servers into Kafka topics, making the data available for stream processing with low latency. An export connector can deliver data from Kafka topics into secondary indexes like Elasticsearch or batch systems such as Hadoop for offline analysis.

Without any further due, let’s start creating a streaming pipeline to ingest CDC events of SQL Server to Kafka using Kafka Connect + Debezium.

Streaming SQL Server CDC to Apache Kafka Architecture

For ease of understanding, I’ll be using Kafka Connect in a standalone mode.

Requirements:

  1. Debezium SQL Server plugin(Version 0.10)- Download from here.
  2. Kafka libs(Version 2.2.1).
  3. A Client Machine (VM/EC2)-To setup Debezium and Kafka connect.
  4. Kafka 2.2.1 Cluster.
  5. SQL Server with CDC Support (2016 Enterprise Edition)

Steps to configure SQL Server for CDC:

  1. Enable CDC for Database —
USE MyDB 
GO
EXEC sys.sp_cdc_enable_db
GO

2. Enable CDC for the table —

USE MyDB 
GO
EXEC sys.sp_cdc_enable_table
@source_schema = N’dbo’,
@source_name = N’MyTable’,
@role_name = N’MyRole’,
@supports_net_changes = 1
GO

3. To view the CDC configuration for your tables —

USE MyDB
GO
EXEC sys.sp_cdc_help_change_data_capture
GO

For more details — check this out here.

Steps to setup Client Machine:

  1. Download and Untar the Kafka libs-
mkdir kafka/
cd kafka
wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz
tar -xzf kafka_2.12-2.2.1.tgz

Note: Since Kafka libs are based on Java, if Java is not installed on this machine please install it and add to the environment variable.

sudo yum install java-1.8.0
export JAVA_HOME=/usr/lib/jvm/jre-1.8.0

2. Download Debezium libs, Untar it and move this folder into the plugins folder.

wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-sqlserver/0.10.0.Final/debezium-connector-sqlserver-0.10.0.Final-plugin.tar.gztar -xzf debezium-connector-sqlserver-0.10.0.Final-plugin.tar.gz
mkdir plugins
mv debezium-connector-sqlserver plugins/

Configure Kafka Connect worker and connector properties:

  1. Create a file “worker.properties” for Kafka Connect worker properties. Paste the below properties into the file —
offset.storage.file.filename=/tmp/connect.offsets
bootstrap.servers=<bootstrap servers list>
offset.flush.interval.ms=10000
rest.port=10082
rest.host.name=<hostname>
rest.advertised.port=10082
rest.advertised.host.name=<hostname>
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
plugin.path=/home/plugins
#If kafka is TLS authenticated, uncomment below lines.
#security.protocol=SSL
#ssl.truststore.location=/tmp/kafka.client.truststore.jks
#producer.security.protocol=SSL
#producer.ssl.truststore.location=/tmp/kafka.client.truststore.jks

Check this link for more Kafka Connect properties.

2. Create a file “connector.properties” for Kafka Connect connector properties. Paste the below properties into the file —

name=sql-server-connection
connector.class=io.debezium.connector.sqlserver.SqlServerConnector
database.hostname=<DB Hostname>
database.port=<DB Port>
database.user=<DB User>
database.password=<DB Password>
database.dbname=<DB Name>
database.server.name=<Assign any name>
table.whitelist=<schema_name.table_name>
database.history.kafka.bootstrap.servers=<bootstrap servers list>
database.history.kafka.topic=dbhistory.history
#If kafka is TLS authenticated, uncomment below lines.
#database.history.producer.security.protocol=SSL
#database.history.producer.ssl.truststore.location=/tmp/kafka.client.truststore.jks

Check this link for more connector properties.

The last and final step is to create a history topic in the Kafka cluster. Connector uses this topic to store the database schema history.

cd kafka/
bin/kafka-topics.s --create --zookeeper <ZooKeeperConnectionString> --replication-factor 1 --partitions 1--topic dbhistory.history

We have successfully set up the SQL Server, and Client machine for streaming. Now, we can start streaming by starting the Kafka connect.

bin/connect-standalone.sh worker.properties connector.properties

After running the above command, you will see the below logs displayed on the screen-

[2019-12-06 15:04:56,316] INFO No previous offset has been found (io.debezium.connector.sqlserver.SqlServerSnapshotChangeEventSource:61)
[2019-12-06 15:04:56,316] INFO According to the connector configuration both schema and data will be snapshotted (io.debezium.connector.sqlserver.SqlServerSnapshotChangeEventSource:63)
[2019-12-06 15:04:56,316] INFO Snapshot step 1 - Preparing (io.debezium.relational.RelationalSnapshotChangeEventSource:112)
[.....][2019-12-06 15:04:56,504] INFO Snapshot step 7 - Snapshotting data (io.debezium.relational.RelationalSnapshotChangeEventSource:155)
[2019-12-06 15:04:56,523] INFO Snapshot step 8 - Finalizing (io.debezium.relational.RelationalSnapshotChangeEventSource:184)
[2019–12–09 13:06:06,331] INFO WorkerSourceTask{id=sql-server-connection} Committing offsets
[2019–12–09 13:06:06,331] INFO WorkerSourceTask{id=sql-server-connection} flushing 0 outstanding messages for offset commit

This indicates the snapshot is taken and a streaming pipeline has been started.

Hurray!!, we have successfully set up a real-time streaming pipeline.

Now go ahead and change some data in the tables of SQL Server and you will able to see this change streaming into the Kafka topic.

The name of the topics is in the form of: “ServerName.SchemaName.TableName”. ServerName is taken from the connector properties i.e “database.server.name”, SchemaName and TableName are self-explanatory.

“Each table from the SQL Server becomes a separate topic in Kafka”

Now let’s see, How does Debezeium output in Kafka topic looks like?. To see the messages in Kafka’s topic, we can start the Kafka consumer which consumes messages and show on the stdout.

bin/kafka-console-consumer.sh --bootstrap-server <BootstrapConnectionString> --topic test_server.dbo.test_table --from-beginning

Understanding the JSON data structure from Debezium:

Below are the 2 sample records from the Kafka topic in JSON form:

{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ssid"},{"type":"string","optional":false,"field":"pwd"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"test_server.dbo.test_table.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ssid"},{"type":"string","optional":false,"field":"pwd"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"test_server.dbo.test_table.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"string","optional":true,"field":"change_lsn"},{"type":"string","optional":true,"field":"commit_lsn"},{"type":"int64","optional":true,"field":"event_serial_no"}],"optional":false,"name":"io.debezium.connector.sqlserver.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"test_server.dbo.test_table.Envelope"},"payload":{"before":null,"after":{"ssid":130,"pwd":"try stream","name":"kafka please work"},"source":{"version":"0.10.0.Final","connector":"sql-server","name":"poc-test","ts_ms":1575627452687,"snapshot":"false","db":"test_db","schema":"dbo","table":"test_table","change_lsn":"0000003a:00000056:0002","commit_lsn":"0000003a:00000056:0003","event_serial_no":1},"op":"c","ts_ms":1575627453646}}{"schema":{"type":"struct","fields":[{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ssid"},{"type":"string","optional":false,"field":"pwd"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"test_server.dbo.test_table.Value","field":"before"},{"type":"struct","fields":[{"type":"int32","optional":false,"field":"ssid"},{"type":"string","optional":false,"field":"pwd"},{"type":"string","optional":true,"field":"name"}],"optional":true,"name":"test_server.dbo.test_table.Value","field":"after"},{"type":"struct","fields":[{"type":"string","optional":false,"field":"version"},{"type":"string","optional":false,"field":"connector"},{"type":"string","optional":false,"field":"name"},{"type":"int64","optional":false,"field":"ts_ms"},{"type":"string","optional":true,"name":"io.debezium.data.Enum","version":1,"parameters":{"allowed":"true,last,false"},"default":"false","field":"snapshot"},{"type":"string","optional":false,"field":"db"},{"type":"string","optional":false,"field":"schema"},{"type":"string","optional":false,"field":"table"},{"type":"string","optional":true,"field":"change_lsn"},{"type":"string","optional":true,"field":"commit_lsn"},{"type":"int64","optional":true,"field":"event_serial_no"}],"optional":false,"name":"io.debezium.connector.sqlserver.Source","field":"source"},{"type":"string","optional":false,"field":"op"},{"type":"int64","optional":true,"field":"ts_ms"}],"optional":false,"name":"test_server.dbo.test_table.Envelope"},"payload":{"before":{"ssid":130,"pwd":"try stream","name":null},"after":{"ssid":130,"pwd":"Updated password","name":"kafka please work"},"source":{"version":"0.10.0.Final","connector":"sql-server","name":"poc-test","ts_ms":1575627568613,"snapshot":"false","db":"test_db","schema":"dbo","table":"test_table","change_lsn":"0000003a:00000070:0002","commit_lsn":"0000003a:00000070:0003","event_serial_no":2},"op":"u","ts_ms":1575627569145}}
  • If you observe, you can see two parts in a record: Schema and payload. Schema gives you the schema information of the record for e.g, it tells you the datatype and of a column. Payload carries the actual data.
  • Before and After keys give you the data before the change and the data after the change respectively.
  • “op” keys tell you the type of event: c -Create(or insert), u-Update, d-Delete, r-Snapshot(Means this record is captured in the snapshot process).

To understand more about the data generated by Debezium, you can check out Debezium documentation.

Conclusion:

We can set up a simple streaming pipeline to ingest CDC events from SQL Server to Kafka using Debezium and Kafka Connect. I would say it was a fairly easy setup, but it does a very and complex job.

To go further you can try to set up this architecture using Kafka Connect in distributed mode.

That’s it! Thank you!

References/Motivation:

--

--