Real-time CDC from SQL Server to Delta Lake using Debezium & Spark Structured Streaming in Kubernetes
Data Engineering Project
This article explores the detailed process of implementing real-time data replication from SQL Server to Delta Lake, utilizing a powerful combination of Debezium, Kafka Connect, and Spark Structured Streaming, all managed within a Kubernetes environment. We will examine the architecture, configuration, and deployment strategies required, showcasing how these technologies work together to enable seamless data replication and synchronization. This comprehensive guide will offer readers valuable insights into creating scalable, resilient, and high-performance data pipelines suited to modern data infrastructure needs.
Pre-requisites
Follow the below guide from official Debezium documentation to set up SQL Server — https://debezium.io/documentation/reference/stable/connectors/sqlserver.html#setting-up-sqlserver
Follow the below post to deploy Spark Operator + External Hive Metastore + Delta Lake — Spark Operator, Delta Lake, and Hive Metastore with Postgres backend on Kubernetes (K8s)
The below post provides details on deploying Kafka in K8s using Strimzi -Strimzi — Deploy Kafka in Kubernetes
Additional reading —
Refer to the below post to use Debezium in K8s to extract and load CDC data from Postgres to Kafka in real-time — CDC using Debezium in Kubernetes
This architecture centers on SQL Server as the source database, which continuously updates transactional data. Debezium, a change data capture tool, monitors SQL Server for changes and streams them to Kafka in real-time. Kafka, acting as a reliable message broker, transports these changes to Spark Structured Streaming.
Spark processes the data from Kafka and writes it to Delta Lake, which provides reliable storage with ACID transactions and supports both streaming and batch data processing. Hive Metastore manages the metadata for Delta Lake, ensuring Spark can efficiently read and write to the tables, maintaining data integrity and usability.
Refer to this official documentation for all the supporting Debezium connectors — https://debezium.io/documentation/reference/stable/connectors/index.html
This guide showcases data capture by utilizing a SQL Server database hosted on the local machine. To access the database, which is external to the Kubernetes cluster(hosted from Docker Desktop), you can substitute localhost
with the database hostname host.docker.internal
. Typically, to access a database from outside the Kubernetes cluster, one would employ service discovery mechanisms like ExternalName.
Create Secret for Database Credentials
db-secret.yaml
apiVersion: v1
kind: Secret
metadata:
name: debezium-secret
namespace: kafka
type: Opaque
stringData:
username: dbzuser
password: dbzpass
Create a Role to refer to the above secret and create a RoleBinding to bind this role to the Kafka Connect cluster service account so that Kafka Connect can access the secret.
db-rbac.yaml
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: connector-configuration-role
namespace: kafka
rules:
- apiGroups: [""]
resources: ["secrets"]
resourceNames: ["debezium-secret"]
verbs: ["get"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: connector-configuration-role-binding
namespace: kafka
subjects:
- kind: ServiceAccount
name: debezium-connect-cluster-connect
namespace: kafka
roleRef:
kind: Role
name: connector-configuration-role
apiGroup: rbac.authorization.k8s.io
Strimzi automatically creates a service account during Kafka Connect deployment. This service account name follows a specific format: $KafkaConnectName-connect
. Since we'll be deploying a Kafka Connect cluster named debezium-connect-cluster
, the corresponding service account name is mentioned as debezium-connect-cluster-connect
.
kubectl apply -f debezium/db-secret.yaml -n kafka
kubectl apply -f debezium/db-rbac.yaml -n kafka
Custom Image for Debezium
To deploy a Debezium connector, one must first set up a Kafka Connect cluster with the required connector plug-ins. This process involves creating a Strimzi container image for Kafka Connect containing the necessary plug-ins before proceeding to instantiate the connector itself.
Refer to the official documentation to download the connectors — https://debezium.io/documentation/reference/stable/install.html
For pre-installed connector images, refer to the Debezium official images — https://quay.io/organization/debezium
Download the Sql Server connector to debezium/plugins
Note: Use the TimestampConverter jar from — https://github.com/oryanmoshe/debezium-timestamp-converter to convert all temporal data types (in all databases) into a specified format you choose.
Default Values:
"timestampConverter.format.time": "HH:mm:ss.SSS",
"timestampConverter.format.date": "YYYY-MM-dd",
"timestampConverter.format.datetime": "YYYY-MM-dd'T'HH:mm:ss.SSS'Z'",
"timestampConverter.debug": "false"
This jar has to be added to all the Debezium connectors individually when using multiple connectors.
DockerfileDebezium
#Dockerfile
FROM quay.io/strimzi/kafka:0.40.0-kafka-3.7.0
USER root:root
COPY debezium/plugins /opt/kafka/plugins/
USER 1001
Build and Push the Image
docker build -t osds-debezium -f debezium/DockerfileDebezium .
docker login
docker tag osds-debezium howdytech01/osds:osds-debezium
docker push howdytech01/osds:osds-debezium
Kafka Connect Cluster
Use Strimzi operator to create the Kafka Connect cluster.
debezium-connect-cluster.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: debezium-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.7.0
image: howdytech01/osds:osds-debezium
replicas: 1
bootstrapServers: osds-cluster-kafka-bootstrap:9092
config:
config.providers: secrets
config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
# -1 means it will use the default replication factor configured in the broker
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
kubectl apply -f debezium/debezium-connect-cluster.yaml -n kafka
The configuration specifies that the bootstrapServers
are set to osds-cluster-kafka-bootstrap:9092
. This indicates that the Kafka Cluster, created in this demo Strimzi within Kubernetes, is being utilized here.
It’s important to note that we’ve configured the Strimzi secret provider. This provider automatically generates a service account for the Kafka Connect cluster, which has already been associated with the necessary role. This setup enables Kafka Connect to securely access the Secret object containing sensitive information such as database credentials.
sqlserver-connector.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: debezium-connector-sqlserver
labels:
strimzi.io/cluster: debezium-connect-cluster
spec:
class: io.debezium.connector.sqlserver.SqlServerConnector
tasksMax: 4
config:
tasks.max: 4
database.hostname: host.docker.internal
database.port: 1433
database.user: dbzuser
database.password: dbzpass
database.names: AdventureWorks2022
database.encrypt: false
topic.prefix: AdventureWorks2022_db
table.include.list: Person.Address,Sales.SalesOrderHeader,Sales.SalesOrderDetail,Production.Product,Sales.Customer
key.converter.schemas.enable: false
value.converter.schemas.enable: false
snapshot.mode: always
message.key.columns: Person.Address:AddressID;Sales.SalesOrderHeader:SalesOrderID;Sales.SalesOrderDetail:SalesOrderID,SalesOrderDetailID;Production.Product:ProductID;Sales.Customer:CustomerID
transforms: unwrap
transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.add.fields: op:_meta_op,table:_meta_table,lsn:_meta_lsn,source.ts_ms:_meta_event_ts,schema:_meta_schema
transforms.unwrap.add.headers: db
transforms.unwrap.delete.handling.mode: rewrite
delete.tombstone.handling.mode: rewrite
converters: timestampConverter
timestampConverter.type: oryanmoshe.kafka.connect.util.TimestampConverter
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
log4j.logger.io.debezium.connector.sqlserver: DEBUG
schema.history.internal.kafka.bootstrap.servers: osds-cluster-kafka-bootstrap:9092
schema.history.internal.kafka.topic : schemahistory.AdventureWorks2022
Important config to note:
- metadata: Contains metadata for the connector.
- name: The name of the connector, “debezium-connector-sqlserver”.
- strimzi.io/cluster: Indicates the Kafka Connect cluster this connector belongs to, “debezium-connect-cluster”.
- tasksMax: Specifies the maximum number of tasks to run in parallel, set to 4.
- database.hostname: Hostname of the SQL Server,
host.docker.internal
. - database.port: Port for SQL Server,
1433
. - database.user: Username for connecting to the database,
dbzuser
. - database.password: Password for the database user,
dbzpass
. - database.names: Name of the SQL Server database,
AdventureWorks2022
. - database.encrypt: Indicates whether encryption is used, set to
false
. - topic.prefix: Prefix for Kafka topics where the data will be published,
AdventureWorks2022_db
. - table.include.list: List of tables to include in the replication, such as
Person.Address
,Sales.SalesOrderHeader
, etc. - key.converter.schemas.enable: Disable schema inclusion in key converter, set to
false
. - value.converter.schemas.enable: Disable schema inclusion in value converter, set to
false
. - snapshot.mode: Mode for initial snapshot, set to
always
to always take a snapshot of the current state. - message.key.columns: Specifies the columns to use as keys for the tables.
- transforms.unwrap.delete.handling.mode: Handles delete operations, set to
rewrite
. - delete.tombstone.handling.mode: Handles tombstone records, set to
rewrite
. - converters: Specifies custom converters to use.
- timestampConverter: Custom converter for handling timestamps.
- timestampConverter.type: Type of the timestamp converter.
- key.converter: Converter for the key, using
org.apache.kafka.connect.json.JsonConverter
. - value.converter: Converter for the value, using
org.apache.kafka.connect.json.JsonConverter
. - log4j.logger.io.debezium.connector.sqlserver: Sets the logging level for the SQL Server connector to
DEBUG
. - schema.history.internal.kafka.bootstrap.servers: Kafka bootstrap servers for schema history,
osds-cluster-kafka-bootstrap:9092
. - schema.history.internal.kafka.topic: Topic for storing schema history,
schemahistory.AdventureWorks2022
.
Kafka Topics Created by Debezium
Verify from the Kafka Cluster terminal
kubectl exec -n kafka -i -t osds-cluster-kafka-0 -- /bin/bash
bin/kafka-topics.sh --list --bootstrap-server localhost:9092
Verify the topics using the Offset Explorer tool
Note:
Add the below jars to the Spark image for Spark Structured Streaming to work in K8s:
spark8s/source_files/spark_delta/spark_venv/spark_home/jars/commons-pool2–2.12.0.jar
spark8s/source_files/spark_delta/spark_venv/spark_home/jars/kafka-clients-3.5.0.jar
spark8s/source_files/spark_delta/spark_venv/spark_home/jars/spark-sql-kafka-0–10_2.12–3.5.0.jar
spark8s/source_files/spark_delta/spark_venv/spark_home/jars/spark-streaming-kafka-0–10-assembly_2.12–3.5.0.jar