Streaming data from MSSQL Server to Azure Data Lake Storage Gen2 (ADLS Gen2) using Apache Kafka, Kafka Connect + Debezium and Apache Nifi on a Docker Container

Lucas Rocha
8 min readOct 9, 2023

--

Introduction

Data integration lies at the heart of data engineering, serving as the linchpin for effective data processing. This article aims to demonstrate how to stream data from a MSSQL Server to Azure Data Lake Storage Gen2 using a powerful combination of technologies, including Apache Kafka, Kafka Connect + Debezium, and Apache Nifi.

Tech Stack

  • Docker Desktop v4.23.0;
  • Microsoft SQL Server 2022 Developer Edition (64-bit);
  • Apache Kafka;
  • Kafka Connect + Debezium (MSSQL Server Connector);
  • Apache Nifi;
  • Azure Data Lake Storage Gen2.

Environment Setup

Before we begin implementing the solution, we need to perform some configurations:

01 — Docker: The first step is to clone the project from GitHub to obtain the docker-compose and Dockerfile with the following command:

git clone https://github.com/lucasjrocha/streaming-sqlserver-cdc-adls.git

After cloning the project, open the terminal, navigate to the root folder where the docker-compose.yml and Dockerfile files are located, and execute the following command so that the image can be created:

docker build . -t cdc:latest

Once completed, execute the command below so that we can fetch the other images and create the containers:

docker compose up -d

You will have an output similar to this one:

01 — After the docker-compose up -d command

You can also check the containers in the Docker Desktop:

Or use the terminal passing “docker ps” command:

02 — Microsoft SQL Server: As you can see, a container named “sqlserver-1” was created, responsible for running MSSQL Server. We will create a database within this server, create a table, and enable CDC on both the database and the table, so you open the terminal and execute the following commands:

docker exec -it streaming-sqlserver-cdc-adls-sqlserver-1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P P@ssw0rd'

With the command above, you will open sqlcmd and be able to execute SQL commands. Then, execute the following:

CREATE DATABASE CDC_2023;
GO
USE CDC_2023
GO
CREATE TABLE t1 (c1 INT IDENTITY, c2 VARCHAR(50), CONSTRAINT pk PRIMARY KEY(c1))
GO
USE CDC_2023
GO
EXEC sys.sp_cdc_enable_db
GO
USE CDC_2023
GO
EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N't1', @role_name = NULL, @filegroup_name = N'PRIMARY'
GO

After that, you’re going to receive an output like the one below:

Job 'cdc.CDC_2023_capture' started successfully.
Job 'cdc.CDC_2023_cleanup' started successfully.

Done, the database/table was created, and CDC is enabled. From now on, any changes made to this table will be captured by the CDC.

03 — Apache Kafka, Kafka Connect + Debezium: Now, let’s configure Kafka Connect with the Debezium SQL Server Connector, which is responsible for reading CDC events from SQL Server and making them accessible through Kafka topics. To do this, you’ll need to execute the following command, which contains various configurations. You can find detailed explanations of these configurations in the Debezium Connector for SQL Server documentation.

Configuration Explanation:

{
"name":"db-connector", #Connector name
"config":{
"connector.class":"io.debezium.connector.sqlserver.SqlServerConnector",
"tasks.max":"1",
"database.server.name":"localhost", #the servername, you can pass just "localhost" in this example
"database.hostname":"sqlserver", #the hostname, in this case the sqlserver container
"database.port":"1433", #the port you've set when created the sqlserver container
"database.user":"sa", #the user you've set when created the sqlserver container
"database.password":"P@ssw0rd", #the password you've set when created the sqlserver container
"database.names":"CDC_2023", #the database name, when you execute the sqlcmd and create it
"topic.prefix":"CDC_T1", # as it sais, this prefix goes in front of the topic name when you create it
"database.encrypt":"false",
"table.include.list":"dbo.t1", #use this command to read just the table you want to read, in this example we limit just to "dbo.t1"
"schema.history.internal.kafka.bootstrap.servers":"kafka:9092",
"schema.history.internal.kafka.topic":"schema-changes.db"
}
}

The configuration above is in the code bellow in a single line, after running this command you will have a Kafka topic that receives the changes made to the table:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d '{"name": "db-connector", "config": {"connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector", "tasks.max" : "1", "database.server.name" : "localhost", "database.hostname" : "sqlserver", "database.port" : "1433", "database.user" : "sa", "database.password" : "P@ssw0rd", "database.names" : "CDC_2023", "topic.prefix" : "CDC_T1", "database.encrypt":"false", "table.include.list": "dbo.t1","schema.history.internal.kafka.bootstrap.servers": "kafka:9092", "schema.history.internal.kafka.topic": "schema-changes.db"}}'

Output:

04 — Kafka Connect + Debezium configuration

After that you will be able to see the topics created, by running the following command:

kafka-topics --list --bootstrap-server kafka:9092

Output:

05 — Kafka Topic created

04 — Apache Nifi: Now, let’s create some processors to read data from the Kafka topic and write this data to Azure Data Lake Storage Gen2. First, open the Apache Nifi UI at the following address: http://localhost:9443/nifi/, you’re going to see a page like the following one:

06 — Apache Nifi

Now, let’s add the following Processors:

  • ConsumeKafkaRecord_2_6 1.23.2: This processor is responsible for consuming messages from Apache Kafka. You will need to configure the following settings for this Processor:
07.1 — ConsumeKafka configuration
  • LogAttribute 1.23.2: This processor emits attributes of the FlowFile at the specified log level, and the standard configuration is as follows:
07.2 — LogAttribute
  • PutAzureDataLakeStorage 1.23.2: This processor writes the contents of a FlowFile as a file on Azure Data Lake Storage Gen 2, here is the configuration:
07–3–1 PutADLS

To configure the Property ADLS Credentials, you will need to create a new Controller Service (ADLSCredentialsControllerService 1.23.2), for this example I used the properties “Storage Account Name” and “Storage Account Key” as follows:

07–3–2 ADLSConfig

After that, let’s configure the “File Name” property. In this configuration, you can set the file name in various ways. However, for this example, I’ve chosen to use the current year-month-day HOUR:minute:second.millisecond of the day minus 3h or 10800000 milliseconds.

07–3–3 ADLSConfigFileName

Here is the code:

${now():toNumber():minus(10800000):format("yyyy-MM-dd HH:mm:ss.SSS'Z'", "GMT")}.json

Done, you have created the three Processors for this example, now you just need to create a connection between them as follows:

08 - Processors

Now, let’s enable the processors. The first time they are enabled, the ‘Consume’ processor will read all the data from the topic from the beginning because the ‘Offset Reset’ configuration in the property was set to ‘earliest’:

08–1 Processors Running

As I mentioned earlier, the first time the “Consume” process starts, it reads all the messages in the Kafka topic from the beginning. So, you can see the file below containing all the table data after the initial snapshot executed by Kafka Connect + Debezium:

08–2 Kafka Messages

Now let’s see the messages written by the PutADLS Processor:

08–3 PutADLS Processor — Messages Written

And if you navigate to your “Azure Portal > Storage Account > Data Lake Storage / Containers > tmp > db > schema > table” you will find the messages written by the PutADLS Processor, as follows:

Each message contains a field named “op” which describes the operation executed in that event. The values for this field are as follows: “r” for read, “c” for create, “u” for update and “d” for delete. The first message for example reads all the data from the specified table, so the operation will be “r” for each table row. Here are some examples:

Read:

08–3–2 Read Operation

I did not mention this before, but when you use Kafka Connect + Debezium to read data changes (CDC) from MSSQL Server or other databases, it generates a payload that explains what happened with each event. In addition to displaying the operation, it also shows the row data before the change in the “before” field and the row data after the change in the “after” field. Keep in mind, with MSSQL Server databases, data type “text” and “ntext” are always assigned a NULL, not returning the previous value in the “before” field.

Update:

08–3–3 Update Operation

Delete:

08–3–4 Delete Operation

Create:

Done, from now on, every change that occurs in the table of the database will be streamed as a single event to Azure Data Lake Storage Gen2. You can now work with these changes to keep your table up-to-date.

Future Work

After completing this initial phase of event ingestion, my aim is to write another article illustrating how to consume and transform this streaming data using Spark Streaming on Azure Databricks.

Conclusion

In this article, we’ve successfully demonstrated how to perform data streaming from an MSSQL Server into a data pipeline using Apache Kafka, Kafka Connect + Debezium, and Apache Nifi, all leading to Azure Data Lake Storage Gen2. This approach allows real-time data capture, transformation, and efficient storage from diverse sources. Beyond CDC, you can use these topics for various other purposes and write these events to the Data Lake as needed. I genuinely hope this article proves valuable on your journey in the fascinating Data World. Please feel free to provide feedback. Best regards.

Reference and Links

--

--