CDC using Debezium in Kubernetes

Introduction

OSDS
8 min readApr 8, 2024

Debezium is an open-source distributed platform for change data capture (CDC). Debezium acts as a real-time CDC engine, capturing inserts, updates, and deletes as they happen.

Debezium offers multiple deployment options, but utilizing Kafka Connect is particularly advantageous, especially for data replication purposes. This method is covered in this article because it seamlessly streams both existing and change data capture (CDC) from source databases into Kafka topics. From there, the data can be efficiently replicated into target databases. By leveraging Kafka Connect with Debezium, organizations can achieve robust and scalable data replication solutions that effectively synchronize data across disparate systems.

This demonstration explores the deployment of Debezium within a Kubernetes environment to capture CDC data from a local Postgres database and stream the changes to a Kafka topic.

Refer to this post to deploy Kafka in Kubernetes using Strimzi. This Kafka cluster will be used to deploy Kafka Connect and use Debezium.

Architecture

Kafka Connect serves as a framework and runtime environment for facilitating data movement within Kafka ecosystems. It supports two key functionalities:

  1. Source Connectors: These connectors, exemplified by Debezium, enable the transmission of records from various sources into Kafka topics.
  2. Sink Connectors: Conversely, sink connectors transfer records from Kafka topics to other target systems.

Kafka Connect operates as an independent service alongside the Kafka broker.

By default, changes from a database table are channeled into a Kafka topic named after the table itself. However, Debezium allows for flexible topic routing via configuration adjustments. For instance, users can:

  • Route records to topics with names distinct from their originating tables.
  • Consolidate change event records from multiple tables into a single topic.

Once change event records reside within Apache Kafka, various connectors within the Kafka Connect ecosystem can stream them to diverse systems and databases, including Elasticsearch, data warehouses, analytics platforms, or caching solutions like Infinispan. Depending on the selected sink connector, configuration of Debezium’s new record state extraction transformation may be necessary.

Source Database

Refer to this official documentation for all the supporting Debezium connectors — https://debezium.io/documentation/reference/stable/connectors/index.html

This guide showcases data capture by utilizing a Postgres database hosted on the local machine. While Postgres can be deployed within Kubernetes as well, you can refer to the guide Postgres on Kubernetes (K8s) for that. To access the database, which is external to the Kubernetes cluster(hosted from Docker Desktop), you can substitute localhost with the database hostname host.docker.internal. Typically, to access a database from outside the Kubernetes cluster, one would employ service discovery mechanisms like ExternalName.

Create Secret for Database Credentials

db-secret.yaml

apiVersion: v1
kind: Secret
metadata:
name: debezium-secret
namespace: kafka
type: Opaque
stringData:
username: dbzuser
password: dbzpass

Create a Role to refer to the above secret and create a RoleBinding to bind this role to the Kafka Connect cluster service account so that Kafka Connect can access the secret.

db-rbac.yaml

apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: connector-configuration-role
namespace: kafka
rules:
- apiGroups: [""]
resources: ["secrets"]
resourceNames: ["debezium-secret"]
verbs: ["get"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: connector-configuration-role-binding
namespace: kafka
subjects:
- kind: ServiceAccount
name: debezium-connect-cluster-connect
namespace: kafka
roleRef:
kind: Role
name: connector-configuration-role
apiGroup: rbac.authorization.k8s.io

Strimzi automatically creates a service account during Kafka Connect deployment. This service account name follows a specific format: $KafkaConnectName-connect. Since we'll be deploying a Kafka Connect cluster named debezium-connect-cluster, the corresponding service account name is mentioned as debezium-connect-cluster-connect.

kubectl apply -f debezium/db-secret.yaml -n kafka

kubectl apply -f debezium/db-rbac.yaml -n kafka

Custom Image for Debezium

To deploy a Debezium connector, one must first set up a Kafka Connect cluster with the required connector plug-ins. This process involves creating a Strimzi container image for Kafka Connect containing the necessary plug-ins before proceeding to instantiate the connector itself.

Refer to the official documentation to download the connectors — https://debezium.io/documentation/reference/stable/install.html

For pre-installed connector images, refer to the Debezium official images — https://quay.io/organization/debezium

For this demo, I download the Postgres connector to debezium/plugins

Note: Use the TimestampConverter jar from — https://github.com/oryanmoshe/debezium-timestamp-converter to convert all temporal data types (in all databases) into a specified format you choose.

Default Values:

"timestampConverter.format.time": "HH:mm:ss.SSS",
"timestampConverter.format.date": "YYYY-MM-dd",
"timestampConverter.format.datetime": "YYYY-MM-dd'T'HH:mm:ss.SSS'Z'",
"timestampConverter.debug": "false"

This jar has to be added to all the Debezium connectors individually when using multiple connectors.

DockerfileDebezium

#Dockerfile

FROM quay.io/strimzi/kafka:0.40.0-kafka-3.7.0
USER root:root
COPY debezium/plugins /opt/kafka/plugins/
USER 1001

Build and Push the Image

docker build -t osds-debezium -f debezium/DockerfileDebezium .
docker login
docker tag osds-debezium howdytech01/osds:osds-debezium
docker push howdytech01/osds:osds-debezium

Kafka Connect Cluster

Use Strimzi operator to create the Kafka Connect cluster.

debezium-connect-cluster.yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: debezium-connect-cluster
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 3.7.0
image: howdytech01/osds:osds-debezium
replicas: 1
bootstrapServers: osds-cluster-kafka-bootstrap:9092
config:
config.providers: secrets
config.providers.secrets.class: io.strimzi.kafka.KubernetesSecretConfigProvider
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
# -1 means it will use the default replication factor configured in the broker
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1

This YAML configuration defines a Kafka Connect resource in a Kubernetes environment using the Strimzi operator. Let’s break it down:

  • apiVersion: Specifies the API version of the resource. In this case, it's kafka.strimzi.io/v1beta2, indicating that it's using Strimzi's custom resource definition for Kafka Connect.
  • kind: Defines the type of Kubernetes resource being created. Here, it's KafkaConnect, indicating it's creating a Kafka Connect resource.
  • metadata: Contains metadata about the resource, such as its name (debezium-connect-cluster) and any annotations.
  • annotations: Additional information about the resource. In this case, it includes an annotation strimzi.io/use-connector-resources: "true", which specifies to use of connector-specific resources.
  • spec: Specifies the desired state of the Kafka Connect resource.
  • version: Specifies the version of Kafka Connect to use. Here, it's 3.7.0.
  • image: Specifies the Docker image to use for Kafka Connect. In this case, it's howdytech01/osds:osds-debezium.
  • replicas: Defines the number of Kafka Connect pods to run.
  • bootstrapServers: Specifies the Kafka bootstrap servers to connect to. It's pointing to osds-cluster-kafka-bootstrap:9092.
  • config: Defines various Kafka Connect configuration options.
  • config.providers: Specifies the source of Kafka Connect configuration. Here, it's secrets.
  • config.providers.secrets.class: Specifies the class for fetching configuration from Kubernetes secrets.
  • group.id, offset.storage.topic, config.storage.topic, status.storage.topic: Configuration options related to Kafka Connect group ID and storage topics.
  • config.storage.replication.factor,offset.storage.replication.factor,status.storage.replication.factor: Replication factors for the storage topics. -1 indicates to use the default replication factor configured in the broker.

This configuration sets up a Kafka Connect cluster named debezium-connect-cluster with specific configurations and points to the Kafka bootstrap servers for connectivity.

kubectl apply -f debezium/debezium-connect-cluster.yaml -n kafka

Note:

The configuration specifies that the bootstrapServers are set to osds-cluster-kafka-bootstrap:9092. This indicates that the Kafka Cluster, created in this demo Strimzi within Kubernetes, is being utilized here.

It’s important to note that we’ve configured the Strimzi secret provider. This provider automatically generates a service account for the Kafka Connect cluster, which has already been associated with the necessary role. This setup enables Kafka Connect to securely access the Secret object containing sensitive information such as database credentials.

Debezium Postgres Connector

Refer to this page to setup and enable CDC logging on Postgres — https://debezium.io/documentation/reference/stable/connectors/postgresql.html

public.actor table from Postgres database

Create a KafkaConnector with the below configuration

postgres-connector.yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: debezium-connector-postgres
labels:
strimzi.io/cluster: debezium-connect-cluster
spec:
class: io.debezium.connector.postgresql.PostgresConnector
tasksMax: 1
config:
tasks.max: 1
database.hostname: host.docker.internal
database.port: 5432
database.user: ${secrets:kafka/debezium-secret:username}
database.password: ${secrets:kafka/debezium-secret:password}
database.dbname: movie_rental_db
topic.prefix: movie_rental_db
plugin.name: pgoutput
publication.autocreate.mode: filtered
table.include.list: public.actor
key.converter.schemas.enable: false
value.converter.schemas.enable: false
snapshot.mode: always
message.key.columns: public.actor:actor_id
transforms: unwrap
transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.add.fields: op:_meta_op,table:_meta_table,lsn:_meta_lsn,source.ts_ms:_meta_event_ts,schema:_meta_schema
transforms.unwrap.add.headers: db
transforms.unwrap.delete.handling.mode: rewrite
delete.tombstone.handling.mode: rewrite
converters: timestampConverter,
timestampConverter.type: oryanmoshe.kafka.connect.util.TimestampConverter
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter

This YAML configuration defines a KafkaConnector resource in a Kubernetes environment using the Strimzi operator. Breakdown of the important configs:

  • labels: Additional labels for the resource. In this case, it includes a label strimzi.io/cluster: debezium-connect-cluster, which specifies the Kafka Connect cluster the connector is associated with.
  • class: Specifies the connector class to use. Here, it's io.debezium.connector.postgresql.PostgresConnector, indicating it's a connector for PostgreSQL databases.
  • tasksMax: Specifies the maximum number of tasks (parallel instances) for the connector. Here, it's 1, meaning only one instance of the connector will be running.
  • tasks.max: Specifies the maximum number of tasks (parallel instances). This configuration option is repeated for clarity, although it's the same as tasksMax specified at the top level.
  • database.hostname, database.port, database.user, database.password, database.dbname: Configuration options for connecting to the PostgreSQL database. The values for database.user and database.password are provided using Kubernetes Secrets (${secrets:kafka/debezium-secret:username} and ${secrets:kafka/debezium-secret:password} respectively).
  • topic.prefix: Specifies the prefix for Kafka topics created by the connector.
  • plugin.name: Specifies the name of the PostgreSQL logical decoding plugin used for capturing changes. Here, it's pgoutput.
  • publication.autocreate.mode: Specifies how publication names are handled when they don't exist. Here, it's set to filtered.
  • table.include.list: Specifies a list of tables to include in change data capture. In this case, it includes only the public.actor table.
  • snapshot.mode: Specifies the mode for initial snapshotting of the database. Here, it's set to always.
  • message.key.columns: Specifies the columns to be used as the key for Kafka messages.
  • transforms, transforms.unwrap.type, transforms.unwrap.add.fields, transforms.unwrap.add.headers,transforms.unwrap.delete.handling.mode: Configurations for message transformations, particularly unwrapping the message state and adding metadata.
  • delete.tombstone.handling.mode: Specifies how tombstone (delete) messages are handled. Here, it's set to rewrite.
  • converters: Specifies additional converters to be used.
  • timestampConverter, timestampConverter.type: Specifies the timestamp converter used to format temporal data types.
  • key.converter, value.converter: Specifies the converters used for message keys and values. Here, JSON converters are used.

This configuration sets up a KafkaConnector named debezium-connector-postgres with specific configurations for connecting to a PostgreSQL database and capturing changes for the public.actor table.

Kafka Topic Created by Debezium

Verify from the Kafka Cluster terminal

kubectl exec -n kafka -i -t osds-cluster-kafka-0 -- /bin/bash
bin/kafka-topics.sh --list --bootstrap-server localhost:9092

Verify the topic using the Offset Explorer tool

References

  1. https://debezium.io/documentation/reference/stable/index.html

--

--