Enhance Your Data Workflow: Debesync - Debezium Companion for Simplifying Data Streaming

C. R. Raja Vignesh
Javarevisited
Published in
4 min readMay 25, 2024

The Debesync Consumer is an Open source project built on top of Quarkus that provides a low latency streaming platform for reading the CDC events published by the Debezium Source Connector to various destinations such as Kafka, Kinesis etc. This utilizes some functionalities of the Debezium to provide customization and tolerance in case of errors to ensure that there is no data loss.

This can be used for many joint or separate functionalities with Debezium for several use cases including Data Replication, Schema Generation, Cache Management, Data Integration, etc.

Basic Architecture

The Debesync Consumer is a consumer application built on top of the Kafka Sink connectors (kafka Connect) to run them in a standalone format and form a complete pipeline when utilized with the Debezium Source connector. This can be used to set up consumer application that can be used to replicate the data from the kafka topic to the desired target database with minimal code changes and provides additional features of durability, reliability and fault tolerance. Multiple instances of the connectors can be deployed listening to the same kafka topic to achieve higher throughput and performance.

Debesync Consumer Functionality

The debesync consumer utilizes the Kafka implementation of the Change Emitter in the debesync-server-kafka module. The change events published to the kafka topic by the Debezium Producer are read by the Change Emitter. These messages are parsed based on the configuration for the debesync consumer under the prefix debezium.sink.converters.* for the key, value and headers. Further documentation for this are provided under Sink Configuration.

The debesync consumer applies the custom transformations for these messages based on the application configuration and predicates. These messages are then pushed to the Sink Task.

These sink tasks are created and managed based on the configurations and implementations specified under the application configuration.

Thus, the kafka sink connector takes care off retrying as well as insertion/updation/deletion of the data based on the type of the CDC Event.

Specifying the type of the Sink connector and passing additional to the kafka connector, in this case for a mysql database:

debezium:
sink:
connector:
class: io.confluent.connect.jdbc.JdbcSinkConnector
type: mysql
mysql:
topics: mysql.infotrends.users
connection:
url: jdbc:mysql://localhost:3306/TargetDB1
port: 3306
user: root
password: root45
auto:
create: false
insert:
mode: upsert
delete:
enabled: true
pk:
mode: record_key

The debezium.sink.connector.class specifies the instance of the Sink Connector to be used while all the configuration under debezium.sink.mysql.* are passed on to the kafka sink connector.

Further documentation about the configuration used here can be referred from the official documentation of the Kafka MySQL Sink Connector.

Transformations and Predicates:

The Debesync consumer also supports the implementations of the Transformations and Predicates specified by the Kafka Connect and hence are compatible with all the transformations provided by Debezium. This was done to improve the compatibility of the consumer application when utilized with Debezium. The transformation configurations will also be similar to that of the debezium except for that fact that these will be configured under the debezium.sink prefix instead of the source.

Example transformation used with mysql to extract the topic details:

debezium.sink.transforms=unwrap,extractTopic
debezium.sink.transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
debezium.sink.transforms.unwrap.drop=false
debezium.sink.transforms.unwrap.tombstones=false
debezium.sink.transforms.unwrap.delete.handling.mode=none
debezium.sink.transforms.extractTopic.type=com.infotrends.in.debesync.mysql.transforms.ExtractTopicName

Error Handling

The Debesync consumer provides a custom implementation as well as specification for the Error Reporter provided by Kafka Connect. This implementation will also be utilized by the Kafka Sink Connector specification and will be inserting the failed records to the Event Store being used. The configurations for these are specified under the prefix debezium.sink.errors.deadletterqueue.*. In case of a Kafka based DLQ, this will in turn be inserting the failed records to Kafka DLQ topics.

A sample configuration for the error handler is shown below:

debezium:
sink:
errors:
deadletterqueue:
topic:
name: $topic-dlq
context:
headers:
enable: true
producer:
bootstrap:
servers: localhost:9093
security:
protocol: SASL_PLAINTEXT
sasl:
jaas:
config: org.apache.kafka.common.security.plain.PlainLoginModule required username='admin' password='admin29!';
mechanism: PLAIN
dns:
lookup: use_all_dns_ips
session:
timeout:
ms: 60000
acks: all
retry:
timeout: 30000
delay:
max:
ms: 30000
tolerance: all

Currently, the Debesync Consumer contains the implementations for the below variations:

  • Emitter/Event Manager
Event Manager: Kafka
Implementation Class: KafkaChangeEmitter in debesync-server-kafka
DLQ Implementation Class: KafkaDlqErrantRecordReporter
  • Sink Implementation
Database: Mysql
Connector: debesync-mysql-connector
Implementation Class: com.infotrends.in.debesync.mysql.sink.connectors.EmbeddedMysqlConnector
Transforms: com.infotrends.in.debesync.mysql.transforms.ExtractTopicName

Database: MongoDB
Connector: debesync-mongo-connector
Implementation Class: com.infotrends.in.debesync.mongo.sink.connectors.EmbeddedMongoConnector
Namspace Mappings: com.infotrends.in.debesync.mongo.namespace.mappings.TopicNameSpaceMapper
Post Processors: com.infotrends.in.debesync.mongo.processors.LoggingPostProcessor

Implementing Debesync Connector

All the dependencies for the Debesync Consumer are provided and managed using a custom BOM for managing them - debesync-quarkus-bom.

<dependencyManagement>
<dependencies>
<dependency>
<groupId>com.infotrends.in</groupId>
<artifactId>debesync-quarkus-bom</artifactId>
<version>${debesync.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

For using the debesync consumer, the core module for the debesync consumer would needed to be added to the POM of our application.

<dependency>
<groupId>com.infotrends.in</groupId>
<artifactId>debesync-server-core</artifactId>
<version>${debesync.version}</version>
</dependency>

We would need to add further dependencies having the implementations for the emitter and sink connectors. Further details can be found under the Using the Debesync Consumer section.

For further details, refer to the detailed documentation provided here: https://github.com/Vicky-cmd/debe-sync-consumer/blob/main/README.md

A sample configuration for the Debesync application with the corresponding debezium configuration as well as implementations can be found here: https://github.com/Vicky-cmd/debe-sync-consumer/tree/main/examples.

The full source code of the project can be found here: https://github.com/Vicky-cmd/debe-sync-consumer/tree/main

--

--

C. R. Raja Vignesh
Javarevisited

Currently Working as a Developer in Tata Consultancy Services on Spring Boot Applications & Microservices. Interested to learn new things about cloud and DevOps