Unlocking the Power of Debezium

Tomer Guttman
payu-engineering
Published in
14 min readAug 20, 2023

This blog post aims to share our experience, and insights with Debezium (CDC Tool), and the development phases we went through to improve reliability, and observability while capturing data changes in our system.

Why Debeizum?

What reasons might drive you to integrate this tool as a part of your system?

In our platform, we use Cassandra as our main database to achieve scalability and low latency. While Debezium does offer CDC capabilities for Cassandra, we decided against its usage due to the distributed nature of Cassandra— it would mean having Debezium installed on every separate Cassandra node.

However, as we started implementing our Treasury platform, which demanded ACID support, we shifted to MySQL. Subsequently, we re-evaluated the suitability of a CDC solution, leading us to select Debezium.

Debezium offers a solution for capturing and reacting to real-time changes in your system’s databases. By seamlessly connecting to various database engines, such as MySQL, PostgreSQL, MongoDB, and more.

Debezium enables you to monitor and capture every database transaction as a stream of events. This provides valuable insights into your application’s data changes, making it ideal for use cases like real-time analytics, data synchronization, monitoring, and building event-driven, and performant architectures.

Introduction

Let’s begin by explaining what Debezium is, and getting a grasp of the concept of CDC.

CDC, which stands for Change Data Capture, is a software pattern used to monitor and capture data changes, allowing other software to respond accordingly.

This process involves capturing changes at the row level within database tables and then streaming these changes to a data-streaming platform like Kafka. This enables consumers to access these streams of change events in the chronological order in which they occurred, empowering them to take appropriate actions based on the changes.

Before diving into the flow when using Debezium, there are several terms that require our understanding:

  1. Binlog — Binary log (binlog) in MySQL is a chronological record of data changes, capturing all write operations performed on the database, essential for replication, point-in-time recovery, and data integrity. Binlog also has retention, which determines how long logs are kept, balancing recovery needs with storage efficiency
  2. Debezium Server — Debezium provides a ready-to-use application that streams change events from a source database to messaging infrastructure like Kafka. For streaming change events to Kafka, we deploy Debezium connectors via Kafka Connect
  3. Debezium Connector — The Debezium connector reads the binlog, produces change events for row-level INSERT, UPDATE, and DELETE operations, and emits the change events to Kafka topics
  4. Kafka — High-throughput, distributed, publish-subscribe messaging queue system that efficiently handles real-time data streams, making it ideal for event-driven architectures and big data processing
  5. Kafka Consumers — Kafka Consumers process real-time data within Kafka, enabling transformations through stream processing while seamlessly integrating with Kafka’s model. It simplifies building scalable, and fault-tolerant data pipelines

Now that we became familiar with the core components, let’s see how data flows inside our system using Debezium.

Data pipeline flow when using Debezium

Our MySQL database has a binlog that stores data changes from all the tables in it. Within these changes lies data from a specific table that we want to monitor.

To capture its changes, we’ve set up a Debezium connector specifically configured to capture changes occurring within this table.

Once a change is detected, the connector captures it and sends that data to a dedicated Kafka topic. Later, our Kafka streams/ consumers retrieve this data from the designated topics, enhancing and modifying it as required, before exporting it to other components such as Snowflake, Elasticsearch, etc.

Configurations, and more Configurations

Once Debezium is deployed within the environment, this is where the fun starts.

To begin capturing changes in a database, you will need to create a Debezium connector.

It’s worth noting that the implementation of each connector type varies based on the database in use. While we’re using the MySQL connector, the key takeaway is understanding the concept of a “connector” rather than the specific implementation details.

The following is an example of a basic connector configuration we had when initially adopting Debezium (v1.6).

{
"name":"debezium-packages-connector-sandbox",
"connector.class":"io.debezium.connector.mysql.MySqlConnector",
"include.schema.changes":"false",
"decimal.handling.mode":"precise",
"transforms":"Reroute,unwrap",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.Reroute.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex":"(.*)payouts_sandbox_qa(.*)",
"transforms.Reroute.topic.replacement":"packages_sandbox",
"transforms.unwrap.add.fields":"",
"database.server.name":"packages_sandbox",
"database.user":"**********",
"database.server.id":"**********",
"database.port":"**********",
"database.hostname":"**********",
"database.password":"**********",
"database.history.kafka.bootstrap.servers":"**********",
"database.history.kafka.topic":"debezium_history_packages_sandbox",
"database.whitelist":"payouts_sandbox_qa",
"table.whitelist":"payouts_sandbox_qa.packages",
"snapshot.mode":"initial",
"snapshot.new.tables":"parallel"
}

Since most of the fields are self-explanatory, we’ll focus on explaining the significant ones:

  1. database.whitelist- Comma-separated list of regular expressions that match the names of the databases for which to capture changes. The connector does not capture changes in any database whose name is not in database.whitelist
  2. table.whitelist — Comma-separated list of regular expressions that match fully-qualified table identifiers of tables whose changes you want to capture
  3. transforms.Reroute.topic.regex — Regular expression that the transformation applies to each change event record to determine if it should be routed to a particular topic
  4. transforms.Reroute.topic.replacement — Regular expression that represents the destination topic name. The transformation routes each matching record to the topic identified by this expression

Put simply, we are configuring the packages-connector to establish a connection to the payouts_sandbox_qa database, and track changes in payouts_sandbox_qa.package table. When it identifies changes, it checks if they originated from a table matching the regex pattern (.*)payouts_sandbox_qa(.*) and if they do, these changes are then exported to a Kafka topic named packages_sandbox.

More detailed explanations about the remaining fields can be found in the Debezium documentation, available at this link — here.

What we can do with all that data?

With Debezium up and running, along with a configured connector exporting events to Kafka from our table, we are now able to consume these events and take actions as needed.

In our case, we export our data to two primary destinations:

  1. Elasticsearch 🔍 — We consume the events, enrich them with additional data from different services in our system, and then export them to Elasticsearch. This enables us to efficiently search and access them through our UI.
  2. Snowflake ❄️ — We consume the events and then export the data to Snowflake, our cloud-based big data tool. Snowflake mainly serves as a platform for generating a variety of reports for our merchants, such as settlement, payment, and other types of reports.

These are just two examples from our system, but the possibilities for what can be done with the data are endless.

Not as easy as it seems

Our company was among the early adopters of Debezium, which led us to face several challenges throughout the process. However, over time, we grew more confident and gained a deeper understanding of the necessary steps we had to make in order to improve our Debezium implementation.

We encountered a few minor issues, often originating from misconfigured connectors. However, the most important topic we’ll discuss is when a connector loses its binlog offset.

As mentioned earlier, every connector can be configured to capture changes for specific tables within the database. The way this works is that the connector captures changes associated with that particular table within the binlog.

After capturing the changes, the connector stores the most recent offset in a dedicated Kafka topic, in our case we named it debezium_connect_offsets.

As seen in the Kafka UI screenshot below, within the specific topic dedicated to Debezium offsets, we can see that the most recent offset message for the packages_sandbox connector is listed as 1435109 (pos).

Offset message of “packages” connector, storing its latest recorded offset

It is crucial to understand the following:

  1. A binlog contains changes from all the tables within MySQL, not only from the tables we’ve configured the connector to capture changes for
  2. MySQL also has a binlog retention setting, which means that after a specified period of time, such as 7 days, the binlog rotates. It means that changes that were recorded within it, are no longer available.

How do these two factors come together? Let me share an example.

Consider packages table. We created a dedicated connector for it, named packages-connector, and configured it to capture changes from it. However, packages the table is characterized by low traffic. In other words, records aren’t added to the table on a daily basis, maybe a few times a month.

Despite the infrequent changes, these changes when they occur, are extremely important for us, and it’s imperative that we avoid missing any of them.

So, packages-connector runs over the course of a few days, successfully capturing changes from the packages table as expected. Then, it stores the most recent offset it recorded as 1435109. Now, let's assume that the activity in packages table comes to a halt.

During this period of inactivity, a complication arises from the continuous progression of binlog offsets. This progression is occurring due to changes taking place in other tables, which are also being logged to the binlog, and the binlog data is starting to rotate.

After a period of two weeks, no updates or inserts were made to the packages table.

Suddenly our MySQL connection disrupts, resulting in the restart of Debezium due to the loss of its connection to MySQL.

After Debezium restarts successfully, the packages-connector is trying to determine the last point at which it captured a change, to see if it potentially missed data changes during the downtime. Meanwhile, it throws an unexpected error.

io.debezium.DebeziumException: The connector is trying to read binlog starting at SourceInfo [...], but this is no longer available on the server.

It attempts to locate the offset 1435109 within the binlog, but discovers that this offset does not exist as a result of the binlog’s rotation during the two-week period.

Not knowing what to do in this situation, the connector throws an exception and goes down.

This presents a major issue. It means that events occurring right now in packages table remain unrecorded during the period when packages-connector is down.

The only way to recover from this is to manually run a full snapshot of the database (using snapshot mode — when_needed, additional information about snapshots can be seen here).

As a snapshot is generated, the connector locks all tables it has been configured to monitor for changes. This is something we try to avoid due to the possible performance impact it could introduce on our database.

As a result, customers might miss out on data associated with new resources created/ updated in the packages table. This is a situation that we cannot allow, and what makes it even worse is that we might not even be aware that this issue is occurring due to a lack of observability.

Improving Reliability and Observability

As time went by, we began to notice connectors losing offsets in our production environment.

This drove us to search for a solution that could improve the reliability and observability of our Debezium components.

Following a thorough research, we eventually discovered the necessary steps to resolve the issues we encountered.

Observability

Logging

Coralogix serves as our primary logging platform, the original structure of Debezium logs was composed of lengthy, unparseable strings.

To enable effective searching in Coralogix we had to transform the logs into JSON objects.

Debezium uses log4j library (don’t worry, it’s after the vulnerability was addressed). To configure its logger we needed to overwrite the log4j.properties file, which defines the logs format.

# This file will override the default configuration of logs format

kafka.logs.dir=logs

log4j.rootLogger=INFO, stdout, appender

# Disable excessive reflection warnings - KAFKA-5229
log4j.logger.org.reflections=ERROR

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.threshold=INFO
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern={"timestamp":"%d{ISO8601}","level":"%p","thread":"%t","logger":"%c","message":"%m","connector_type":"%X{dbz.connectorType}","connector_name":"%X{dbz.connectorName}","connector_context":"%X{dbz.connectorContext}"}%n


log4j.appender.appender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.appender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.appender.File=${kafka.logs.dir}/connect-service.log
log4j.appender.appender.layout=org.apache.log4j.PatternLayout
log4j.appender.appender.layout.ConversionPattern={"timestamp":"%d{ISO8601}","level":"%p","thread":"%t","logger":"%c","message":"%m","connector_type":"%X{dbz.connectorType}","connector_name":"%X{dbz.connectorName}","connector_context":"%X{dbz.connectorContext}"}%n

As you can see, the ConversionPattern modification is the main and only change. Once this file is overwritten, our logs became searchable through Coralogix. This simplified our debugging and made our life much easier.

Additional information about logging can be seen here.

Below, you can observe our latest JSON logs being displayed in Coralogix.

Debezium logs in JSON format, as presented in Coralogix

— —

Metrics

We also discovered that Debezium had introduced in their documentation a way to monitor it. This involves the usage of JMX metrics provided by Kafka, which can be later on exported to Prometheus and Grafana.

Since we were already using Prometheus and Grafana we immediately jumped on the catch.

We then set up alerts for all of our connectors and also updated our Grafana Debezium dashboards. This allowed us to have a real-time overview of our connector statuses, and in the event of any downtime, we immediately know about it.

Debezium dashboard in Grafana showing all of our connectors’ statuses

Below is an example of a PromQL (Prometheus Querying Langauge) definition for an indicator of one of our connectors. It’s straightforward yet effective. If the connector is up and running it returns one, if it’s down or doesn’t even exist, it will return a zero.

With this indicator in place, we could easily create an alert by checking if its query result is not equal to one.

sum(debezium_metrics_Connected{context="streaming", env="eks-prd-apps", name=~"balances_test_incoming_operation.*"}) or vector(0)
The result of the query above after being executed

Now, after the alerts were set in place. If an alert is triggered by one of our connectors, we will receive a notification immediately in our Slack channel. An example of an alert in our channel can be seen below.

Slack notification we received due to the alert

Reliability

This is where the advanced concepts are coming into play.

We continued to face the problem of connectors losing binlog offsets. This issue occurred in both production and test environments, particularly when we configured connectors to capture changes from low-traffic tables.

Debezium already had a heartbeat logic in place (v1.6), but it didn’t help us. This is because the existing heartbeat was focused on Kafka, rather than the database itself.

However, luck was on our side, as in later versions a new field was introduced: heartbeat.action.query, and this is what it does —

“Specifies a query that the connector executes on the source database when the connector sends a heartbeat message”

This is exactly what we needed for our low-traffic connectors. However in order to make use of it, we needed to upgrade, since this field wasn’t supported in our current version.

We decided to upgrade Debezium and go through the breaking changes. Despite encountering multiple challenges during that process, eventually, with sweat and tears, we managed to do it and upgraded to ✨ v2.3.0 ✨, which is the most recent release at the time of writing this blog post.

Needless to say, we were thrilled with the outcome!

Now, In order to solve the offset problem, we decided on the following.

For each microservice with a table from which we were capturing changes, we created a dedicated table named debezium_heartbeat. This table is reserved for the heartbeat.action.query that we planned to configure for each connector.

The MySQL definition of debezium_heartbeat table is as follows:

CREATE TABLE IF NOT EXISTS debezium_heartbeat (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
connector_name VARCHAR(255) NOT NULL,
last_heartbeat TIMESTAMP(6) NOT NULL DEFAULT CURRENT_TIMESTAMP(6),
UNIQUE KEY connector_name_index (connector_name)
);

When the heartbeat is triggered, it will initiate the execution of the following query:

INSERT INTO <database_name>.debezium_heartbeat (connector_name, last_heartbeat) VALUES ('<connector_name>', NOW()) ON DUPLICATE KEY UPDATE last_heartbeat = NOW()

At the same time, we reconfigure our connectors to capture changes from two tables — the original table but also changes of the debezium_heartbeat table.

With heartbeat set to occur once an hour, we can be certain that the connector’s offset will remain up-to-date — This is due to the connector capturing changes from both tables, effectively storing the latest offset out of both within the binlog.

Our connector configuration now looks as such in v2.3.0.

{
"name":"debezium-packages-connector-sandbox",
"connector.class":"io.debezium.connector.mysql.MySqlConnector",
"topic.heartbeat.prefix":"debezium_heartbeat",
"topic.prefix":"packages_sandbox", // previously "database.server.name"
"include.schema.changes":"false",
"decimal.handling.mode":"precise",
"transforms":"Reroute,unwrap",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.Reroute.type":"io.debezium.transforms.ByLogicalTableRouter",
"transforms.Reroute.topic.regex":"(.*)payouts_sandbox_qa.(.*)",
"transforms.Reroute.topic.replacement":"$2_sandbox",
"transforms.unwrap.add.fields":"",
"database.user":"**********",
"database.server.id":"**********",
"database.port":"**********",
"database.hostname":"**********",
"database.password":"**********",
"database.include.list":"payouts_sandbox_qa", // previously "database.whitelist"
"table.include.list":"payouts_sandbox_qa.packages,payouts_sandbox_qa.debezium_heartbeat", // previously "table.whitelist"
"schema.history.internal.kafka.bootstrap.servers":"**********", // previously "database.history.kafka.bootstrap.servers"
"schema.history.internal.kafka.topic":"debezium_history_packages_sandbox", // previously "database.history.kafka.topic"
"heartbeat.interval.ms":"3600000",
"heartbeat.action.query":"INSERT INTO payouts_sandbox_qa.debezium_heartbeat (connector_name, last_heartbeat) VALUES ('debezium-packages-connector-sandbox', NOW()) ON DUPLICATE KEY UPDATE last_heartbeat = NOW()",
"snapshot.mode":"initial",
"snapshot.new.tables":"parallel"
}

The graph below offers a better grasp of how the connector offsets are updated within Kafka as a result of the hourly execution of the heartbeat query.

The execution of the heartbeat query triggers a capturing event on the “debezium_heartbeat” table. Consequently, this event leads to an update in the dedicated offset topic.

We can see that we are now capturing changes from two tables, that are part of the same database

  1. payouts_sandbox_qa.packages
  2. payouts_sandbox_qa.debezium_heartbeat

Those who have sharp eyes out there will notice that adjustments were made to transforms.Reroute.topic.regex and transforms.Reroute.topic.replacement.

In our current configuration for capturing changes from two separate tables, our goal is to ensure that the connector exports events from each table to its dedicated Kafka topic. This can be achieved by utilizing a group regex pattern structured as follows: (.*)payouts_sandbox_qa.(.*).

This means that when a change is captured within one of the tables, the following scenarios will take place:

  1. Change in payouts_sandbox_qa.packages$1 will be set as an empty string, and $2 will be set as packages, resulting in the event being routed to the packages_sandbox topic
  2. Change in payouts_sandbox_qa.debezium_heartbeat$1 will be set as an empty string, and $2 will be set as debezium_heartbeat, resulting in the event being routed to the debezium_heartbeat_sandbox topic

Given that packages_sandbox topic is already created in Kafka, it’s important to understand that the heartbeat topics must be created prior to this change. Otherwise Debezium will throw an error.

Furthermore, additional topics must be created, as Debezium also sends Kafka messages to the following topic:

__debezium-heartbeat.<topic.prefix>

Summarizing the reliability section, we’ve introduced a connector designed to capture changes from two tables. Its offset is updated hourly through the heartbeat logic we’ve put in place, ensuring that the offset remains up-to-date.

Moreover, this connector exports events to two distinct Kafka topics: one for the heartbeats and the other for the packages.

We’ve implemented this approach across all our connectors, guaranteeing the retention of offsets for an extended duration.

This assurance not only maintains the seamless operation of our connectors, but also enables them to effortlessly recover even in the event of an unexpected Debezium restart.

Conclusion

I’ll make an effort to provide you with some of the valuable insights I’ve acquired during this journey:

  1. Debezium is a complex component that should not be underestimated. I’ve touched on only a fraction of the topics, there is much more to Debezium such as snapshot modes, transformations, security features, etc. It requires attention and care, and the learning curve takes time. However, once you master it, you’ll realize how powerful it is
  2. The community plays a vital role. I discovered a dedicated Debezium forum where knowledge is freely shared, and questions are addressed daily. This resource proved invaluable to us along the journey (link to the forum — https://debezium.zulipchat.com).
  3. The Debezium documentation is comprehensive, so it’s important to conduct thorough searches across the various sections. Make sure to select the version you’re using and the specific connector type related to your database. Many times, we confronted challenges that were eventually covered in the documentation (link to the documentation — https://debezium.io/documentation)
  4. In addition, maintaining close relationships with your peers is crucial, especially with the operations and data teams. Our journey wouldn’t have been possible without their expertise and guidance, especially during the upgrade process and when understanding data pipelines. Their insights helped us avoid mistakes and were truly invaluable
  5. I strongly recommend using a Monorepo that consolidates all your connectors within a single project. If you’re dealing with multiple connectors, adopting a Monorepo approach can significantly simplify your workflow — take my word for it!

I hope you’ve made it this far through the blog post. I understand that it’s been quite an extensive read, but I’m mindful that there could be someone out there on our beautiful planet who is currently wrestling with Debezium, perhaps attempting to address challenges similar to the ones we’ve explored here.

If this blog post helps them overcome these obstacles, it would be genuinely satisfying to help a fellow engineer.

Meanwhile, if you found this blog post intriguing or beneficial, don’t hesitate! Feel free to leave a like, comment, and subscribe. Of course, if any questions come to mind after reading this, I’ll be sure to answer them.

--

--