Real-time CDC from SQL Server to Delta Lake using Debezium & Spark Structured Streaming in Kubernetes

OSDS
6 min readMay 16, 2024

--

Data Engineering Project

This article explores the detailed process of implementing real-time data replication from SQL Server to Delta Lake, utilizing a powerful combination of Debezium, Kafka Connect, and Spark Structured Streaming, all managed within a Kubernetes environment. We will examine the architecture, configuration, and deployment strategies required, showcasing how these technologies work together to enable seamless data replication and synchronization. This comprehensive guide will offer readers valuable insights into creating scalable, resilient, and high-performance data pipelines suited to modern data infrastructure needs.

Pre-requisites

Follow the below guide from official Debezium documentation to set up SQL Server — https://debezium.io/documentation/reference/stable/connectors/sqlserver.html#setting-up-sqlserver

Follow the below post to deploy Spark Operator + External Hive Metastore + Delta Lake — Spark Operator, Delta Lake, and Hive Metastore with Postgres backend on Kubernetes (K8s)

The below post provides details on deploying Kafka in K8s using Strimzi -Strimzi — Deploy Kafka in Kubernetes

Additional reading —

Refer to the below post to use Debezium in K8s to extract and load CDC data from Postgres to Kafka in real-time — CDC using Debezium in Kubernetes

This architecture centers on SQL Server as the source database, which continuously updates transactional data. Debezium, a change data capture tool, monitors SQL Server for changes and streams them to Kafka in real-time. Kafka, acting as a reliable message broker, transports these changes to Spark Structured Streaming.

Spark processes the data from Kafka and writes it to Delta Lake, which provides reliable storage with ACID transactions and supports both streaming and batch data processing. Hive Metastore manages the metadata for Delta Lake, ensuring Spark can efficiently read and write to the tables, maintaining data integrity and usability.

Refer to this official documentation for all the supporting Debezium connectors — https://debezium.io/documentation/reference/stable/connectors/index.html

This guide showcases data capture by utilizing a SQL Server database hosted on the local machine. To access the database, which is external to the Kubernetes cluster(hosted from Docker Desktop), you can substitute localhost with the database hostname host.docker.internal. Typically, to access a database from outside the Kubernetes cluster, one would employ service discovery mechanisms like ExternalName.

Create Secret for Database Credentials

db-secret.yaml

apiVersion: v1
kind: Secret
metadata:
name: debezium-secret
namespace: kafka
type: Opaque
stringData:
username: dbzuser
password: dbzpass

Create a Role to refer to the above secret and create a RoleBinding to bind this role to the Kafka Connect cluster service account so that Kafka Connect can access the secret.

db-rbac.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: connector-configuration-role
namespace: kafka
rules:
- apiGroups: [""]
resources: ["secrets"]
resourceNames: ["debezium-secret"]
verbs: ["get"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: connector-configuration-role-binding
namespace: kafka
subjects:
- kind: ServiceAccount
name: debezium-connect-cluster-connect
namespace: kafka
roleRef:
kind: Role
name: connector-configuration-role
apiGroup: rbac.authorization.k8s.io

Strimzi automatically creates a service account during Kafka Connect deployment. This service account name follows a specific format: $KafkaConnectName-connect. Since we'll be deploying a Kafka Connect cluster named debezium-connect-cluster, the corresponding service account name is mentioned as debezium-connect-cluster-connect.

kubectl apply -f debezium/db-secret.yaml -n kafka
kubectl apply -f debezium/db-rbac.yaml -n kafka

Custom Image for Debezium

To deploy a Debezium connector, one must first set up a Kafka Connect cluster with the required connector plug-ins. This process involves creating a Strimzi container image for Kafka Connect containing the necessary plug-ins before proceeding to instantiate the connector itself.

Refer to the official documentation to download the connectors — https://debezium.io/documentation/reference/stable/install.html

For pre-installed connector images, refer to the Debezium official images — https://quay.io/organization/debezium

Download the Sql Server connector to debezium/plugins

Note: Use the TimestampConverter jar from — https://github.com/oryanmoshe/debezium-timestamp-converter to convert all temporal data types (in all databases) into a specified format you choose.

Default Values:

"timestampConverter.format.time": "HH:mm:ss.SSS",
"timestampConverter.format.date": "YYYY-MM-dd",
"timestampConverter.format.datetime": "YYYY-MM-dd'T'HH:mm:ss.SSS'Z'",
"timestampConverter.debug": "false"

This jar has to be added to all the Debezium connectors individually when using multiple connectors.

DockerfileDebezium

#Dockerfile
FROM quay.io/strimzi/kafka:0.40.0-kafka-3.7.0
USER root:root
COPY debezium/plugins /opt/kafka/plugins/
USER 1001

Build and Push the Image

docker build -t osds-debezium -f debezium/DockerfileDebezium .
docker login
docker tag osds-debezium howdytech01/osds:osds-debezium
docker push howdytech01/osds:osds-debezium

Kafka Connect Cluster

Use Strimzi operator to create the Kafka Connect cluster.

debezium-connect-cluster.yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: debezium-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.7.0
image: howdytech01/osds:osds-debezium
replicas: 1
bootstrapServers: osds-cluster-kafka-bootstrap:9092
config:
config.providers: secrets
config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
# -1 means it will use the default replication factor configured in the broker
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
kubectl apply -f debezium/debezium-connect-cluster.yaml -n kafka

The configuration specifies that the bootstrapServers are set to osds-cluster-kafka-bootstrap:9092. This indicates that the Kafka Cluster, created in this demo Strimzi within Kubernetes, is being utilized here.

It’s important to note that we’ve configured the Strimzi secret provider. This provider automatically generates a service account for the Kafka Connect cluster, which has already been associated with the necessary role. This setup enables Kafka Connect to securely access the Secret object containing sensitive information such as database credentials.

sqlserver-connector.yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: debezium-connector-sqlserver
labels:
strimzi.io/cluster: debezium-connect-cluster
spec:
class: io.debezium.connector.sqlserver.SqlServerConnector
tasksMax: 4
config:
tasks.max: 4
database.hostname: host.docker.internal
database.port: 1433
database.user: dbzuser
database.password: dbzpass
database.names: AdventureWorks2022
database.encrypt: false
topic.prefix: AdventureWorks2022_db
table.include.list: Person.Address,Sales.SalesOrderHeader,Sales.SalesOrderDetail,Production.Product,Sales.Customer
key.converter.schemas.enable: false
value.converter.schemas.enable: false
snapshot.mode: always
message.key.columns: Person.Address:AddressID;Sales.SalesOrderHeader:SalesOrderID;Sales.SalesOrderDetail:SalesOrderID,SalesOrderDetailID;Production.Product:ProductID;Sales.Customer:CustomerID
transforms: unwrap
transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.add.fields: op:_meta_op,table:_meta_table,lsn:_meta_lsn,source.ts_ms:_meta_event_ts,schema:_meta_schema
transforms.unwrap.add.headers: db
transforms.unwrap.delete.handling.mode: rewrite
delete.tombstone.handling.mode: rewrite
converters: timestampConverter
timestampConverter.type: oryanmoshe.kafka.connect.util.TimestampConverter
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
log4j.logger.io.debezium.connector.sqlserver: DEBUG
schema.history.internal.kafka.bootstrap.servers: osds-cluster-kafka-bootstrap:9092
schema.history.internal.kafka.topic : schemahistory.AdventureWorks2022

Important config to note:

  • metadata: Contains metadata for the connector.
  • name: The name of the connector, “debezium-connector-sqlserver”.
  • strimzi.io/cluster: Indicates the Kafka Connect cluster this connector belongs to, “debezium-connect-cluster”.
  • tasksMax: Specifies the maximum number of tasks to run in parallel, set to 4.
  • database.hostname: Hostname of the SQL Server, host.docker.internal.
  • database.port: Port for SQL Server, 1433.
  • database.user: Username for connecting to the database, dbzuser.
  • database.password: Password for the database user, dbzpass.
  • database.names: Name of the SQL Server database, AdventureWorks2022.
  • database.encrypt: Indicates whether encryption is used, set to false.
  • topic.prefix: Prefix for Kafka topics where the data will be published, AdventureWorks2022_db.
  • table.include.list: List of tables to include in the replication, such as Person.Address, Sales.SalesOrderHeader, etc.
  • key.converter.schemas.enable: Disable schema inclusion in key converter, set to false.
  • value.converter.schemas.enable: Disable schema inclusion in value converter, set to false.
  • snapshot.mode: Mode for initial snapshot, set to always to always take a snapshot of the current state.
  • message.key.columns: Specifies the columns to use as keys for the tables.
  • transforms.unwrap.delete.handling.mode: Handles delete operations, set to rewrite.
  • delete.tombstone.handling.mode: Handles tombstone records, set to rewrite.
  • converters: Specifies custom converters to use.
  • timestampConverter: Custom converter for handling timestamps.
  • timestampConverter.type: Type of the timestamp converter.
  • key.converter: Converter for the key, using org.apache.kafka.connect.json.JsonConverter.
  • value.converter: Converter for the value, using org.apache.kafka.connect.json.JsonConverter.
  • log4j.logger.io.debezium.connector.sqlserver: Sets the logging level for the SQL Server connector to DEBUG.
  • schema.history.internal.kafka.bootstrap.servers: Kafka bootstrap servers for schema history, osds-cluster-kafka-bootstrap:9092.
  • schema.history.internal.kafka.topic: Topic for storing schema history, schemahistory.AdventureWorks2022.

Kafka Topics Created by Debezium

Verify from the Kafka Cluster terminal

kubectl exec -n kafka -i -t osds-cluster-kafka-0 -- /bin/bash
bin/kafka-topics.sh --list --bootstrap-server localhost:9092

Verify the topics using the Offset Explorer tool

Note:

Add the below jars to the Spark image for Spark Structured Streaming to work in K8s:

spark8s/source_files/spark_delta/spark_venv/spark_home/jars/commons-pool2–2.12.0.jar
spark8s/source_files/spark_delta/spark_venv/spark_home/jars/kafka-clients-3.5.0.jar
spark8s/source_files/spark_delta/spark_venv/spark_home/jars/spark-sql-kafka-0–10_2.12–3.5.0.jar
spark8s/source_files/spark_delta/spark_venv/spark_home/jars/spark-streaming-kafka-0–10-assembly_2.12–3.5.0.jar

References

  1. https://debezium.io/documentation/reference/stable/index.html

--

--