CDC from zero to hero

Andrei Tserakhau
Plumbers Of Data Science
15 min readOct 13, 2023

CDC from zero to hero

In modern distributed systems, various types of databases are needed, such as caches, search indexes, and analytical replicas, with reactive interactions between them. Addressing each of these tasks individually can be quite challenging, but it turns out that a single mechanism can solve all of these tasks, and its name is Change Data Capture.

What is Change Data Capture (CDC)

Change Data Capture (CDC) is a set of software development patterns that allow you to establish a reactive infrastructure, simplify microservices architecture, and break down a monolith into microservices.

Typically, in scenarios like these, you start with an OLTP (Online Transaction Processing) database. With CDC, you obtain a stream of events regarding the addition, modification, and deletion of rows, and then process them. This transforms the database into an event-driven system.

A typical use case for Change Data Capture (CDC)

A properly configured CDC (Change Data Capture) enables achieving event responsiveness within a matter of milliseconds. This is why CDC is frequently employed when there is a need to analyze a portion of the data, which has been moved to external systems, in real-time.

What does CDC actually stand for?

Below are examples of SQL commands for a PostgreSQL database and the CDC events they generate. For the illustrative example, the format or protocol is not as important. In this notation, “op” stands for operation, which can be “c” for create (insert), “u” for update, and “d” for delete.

Database

CREATE TABLE my_little_pony (
id INT PRIMARY KEY,
name TEXT,
description TEXT
);
INSERT INTO my_little_pony (id, name, description)
VALUES
(1, 'Twilight Sparkle', ''),
(2, 'Rainbow Dash', ''),
(3, 'Spike', 'Spike goes BRRR');

Serialized CDC

{
"op":"c",
"after":{
"id":1,
"name":"Twilight Sparkle",
"description":""
}
}
{
"op":"c",
"after":{
"id":2,
"name":"Rainbow Dash",
"description":""
}
}
{
"op":"c",
"after":{
"id":3,
"name":"Spike",
"description":"Spike goes BRRR..."
}
}

Database

UPDATE my_little_pony 
SET description='_'
WHERE id<3;

Serialized CDC

{
"op":"u",
"after":{
"id":1,"name":
"Twilight Sparkle",
"description":"_"
}
}
{
"op":"u",
"after":{
"id":2,
"name":"Rainbow Dash",
"description":"_"
}
}

Database

DELETE FROM my_little_pony 
WHERE name='Rainbow Dash';

Serialized CDC

{
"op":"d",
"before":{
"id":2
},
}

As seen from the example, CDC events have the following properties:

  1. The database-specific details disappear in the CDC stream (in practice, it’s reduced to almost zero, but some information can still be obtained).
  2. Any command is transformed into a set of modified rows, including the new state. Depending on the settings, the previous state can also be obtained.

When CDC shine

The ability of the CDC to swiftly and efficiently move data in small portions makes it valuable for real-time responsiveness to changes in the source database. There are two main areas where the CDC is applied:

  1. Enabling reactive interactions between infrastructure components.
  2. Implementing various patterns for constructing microservices architecture.

Let’s see those two areas separately.

Reactive interaction between infrastructure components

In modern architectures, it’s common to work with only a portion of the database, and CDC is typically applied to specific tables. By having a sequence of modifying events for a table, you can obtain an asynchronous replica of that table. And since the events have already been decoupled from the source database in a format independent of the database itself, the receiving end can be anything: OLTP/OLAP/Cache/Full-Text Search.

If everything is set up correctly, you can, for instance, achieve low-latency caching without the need to make changes to the business logic of your services. Let’s explore how this works with specific examples of replication.

Replication into DWH

By applying a stream of changes to a Data Warehouse (DWH), you can obtain an asynchronous replica of your production OLTP database. This solution addresses several issues at once:

  • It allows analysts to work without impacting the production OLTP database.
  • It solves the challenge of transferring data from the transactional database to the analytical one.
  • It enables analysts to work with analytical databases.

For example, you can automatically obtain an asynchronous replica of your production PostgreSQL database in ClickHouse, Elasticsearch/OpenSearch, or S3.

Here’s an example of delivering a replication stream from MySQL to ClickHouse.

Cache invalidation

By applying a stream of changes to a cache, you can obtain a cache directly from your production OLTP database that is automatically updated (optionally aggregated/transformed). As a practical example, I can mention the RedisCDC project for Redis. This is a distributed platform that simplifies the delivery of CDC streams to Redis.

Update search index (elastic / solr etc).

By sending the relevant portion of the change stream to a full-text search engine, you can obtain an automatically updated full-text search index. An example of delivering a replication stream from MySQL to Elasticsearch.

Replication into the same OLTP storage

Having a stream of changes in the database allows you to obtain an asynchronous logical replica, for example, for a developers’ development environment (dev-stand). Since events are usually transmitted in a logical form, you can also perform logical transformations on them along the way. For instance, you can exclude columns with certain sensitive data from the replica and encrypt other sensitive data.

Replication into other OLTP storage

If, for any reason, you want to replicate your data to another database, such as your production data in MySQL to be used in PostgreSQL, or vice versa, this can also be achieved using CDC. Such replication might be necessary due to certain tool limitations.

Example of delivering data from MySQL to PostgreSQL.

Auditing what happens in your storage

CDC provides the ability to conduct a comprehensive audit of what’s happening in the database. By preserving events of changes in the database “as-is” in some storage, you can obtain audit logs of everything happening with your production database. And when processing the message stream, you can achieve real-time auditing.

Example with a scenario of such an audit.

Sharing change events between services

By dispatching events to a queue, you can notify other components or commands about the occurring events. While a more systematic approach to this is discussed in the section about microservices patterns, you can inform your colleagues about events without relying on specific patterns.

Providing cross-region failovers

If you have a cross-region queue, by dispatching CDC to the queue, you can duplicate readers in two different data centers, thereby ensuring a region failure guarantee (in case one region goes offline).

Reducing the load on the main OLTP storage

CDC allows for reducing the load on the master transactional database or service, especially when dealing with a microservices database. Serializing the CDC stream in a queue allows multiple receivers to work with the master’s information, burdening it only once. You can activate all the mentioned receivers, and even double them to include copies in each data center, without changing the load on the master — they will all be reading the message queue.

CDC patterns for microservice

CDC is an approach that allows for the simultaneous use of multiple microservices architecture patterns. From the perspective of microservices design patterns, CDC enables the concurrent implementation of event-driven processing between microservices, decoupling of microservices, and assists in breaking down monoliths into microservices. I won’t delve into the subtle nuances of the differences between these patterns here, but for those interested, I’ll share a list of useful references on this topic at the end.

Outbox patter

The outbox pattern (also known as Application events) prescribes combining state changes with the sending of notifications to other services. This can be achieved either by saving messages for sending in adjacent tables within the same transaction (see transactional outbox) or by subsequently sending messages through CDC.

When stacking the CDC stream into a queue, a side effect is achieved: event sourcing with domain events on a bus like Apache Kafka and service interaction through Kafka. This partly enables the decoupling of microservices: services no longer need to actively notify others; those needing events can subscribe to them, and the service works only with its own database. Additionally, some free benefits include the absence of the need for service discovery, reproducibility, and the ease of storing the entire stream, for example, in Elasticsearch.

CQRS pattern

Command and Query Responsibility Segregation (CQRS) is a pattern in which an application is conceptually divided into two parts: one that modifies the state and one that reads the state. CDC allows you to direct the part that modifies the state through a queue to another database, from which reading operations are performed.

Strangler pattern

Strangler is a pattern for breaking down a monolith into microservices. Properly prepared CDC is completely transparent to the legacy application and doesn’t require any changes to the inherited data model, which greatly facilitates the decomposition of the monolith and makes integration into the monolith as non-invasive as possible.

How to implement the CDC

In theory, there are three ways to implement CDC: timestamps on rows, triggers on tables, and logical replication. In this section, I will explain these methods, their advantages, and disadvantages.

Query-based CDC

For example, you might have a table in your database where there’s a dedicated column (let’s say named “updated_at”) that gets filled with the value of now() on every insert/update.

In this case, you can regularly retrieve new or updated rows through polling with a straightforward query:

SELECT ... FROM my_table WHERE updated_at > saved_val;

However, in this basic setup, you won’t be able to distinguish between an insert and an update, and you won’t receive notifications about deleted rows. By complicating the schema, you can learn to differentiate between inserts and updates (for example, by introducing a “created_at” field) and receive notifications for row deletions (for instance, by replacing deletions with a flag like “is_deleted”).

Pros

  1. it works;
  2. it does not store the history of changes separately (as the trigger-based approach, which will be discussed below, does).

Cons

  1. Non-obvious interaction requirements with the database.
  2. Schema and business logic modifications are needed (for timestamp storage).
  3. Involves polling, which requires an external process with regular queries, potentially slowing down the database.
  4. Simple implementations lack the capability to capture row deletion events.

There are modifications to this approach that allow for mitigating some drawbacks at the cost of complicating the schema, but the essence remains the same. Here’s what you can add:

  • Version numbers in rows.
  • State indicators in rows.
  • Time/version/status in rows.

Table triggers

In this case, you create a dedicated table for history and set up triggers that activate when rows are modified or deleted.

Pros

  1. it works;
  2. Changes are captured instantly, enabling real-time processing.
  3. Triggers can capture all types of events, including deletions.
  4. Triggers can add valuable metadata.

Cons

  1. Triggers burden and slow down the database.
  2. Triggers modify the database.
  3. Requires an external process to read change history.
  4. Creating and maintaining triggers can be a complex task, especially considering schema evolution.

Logical replication

Every database, to provide ACID guarantees, contains a Write-Ahead Log, a log of changes encoded in binary format: in PostgreSQL it’s called “wal”, in MySQL — “binlog”, in Oracle — “redo-log”, in MongoDB — “oplog” and in MSSQL — “transaction log.”

Essentially, such a log already contains what we need; we just need to decode it and send it to a message queue.

However, there are occasional unexpected limitations. For example, in the case of PostgreSQL, this approach is not compatible with pg_repack.

Pros

  1. Real-time processing is achieved.
  2. All types of changes are captured: insert, update, and delete.
  3. It doesn’t require an additional data storage mechanism, unlike triggers; all data is stored in the wal-log.
  4. It’s mostly not polling, so it doesn’t create a significant additional load on the database. All that’s needed is to sequentially read the wal from disk, decode it, and deliver it. This is typically not noticeable.

Cons

  1. It’s complicated
  2. Often requires a separate role.
  3. This mechanism may not exist in very old database versions.
  4. If replication is not configured correctly (e.g., not limiting wal growth), can be dangerous (logs can eat all disks).

How do we do CDC at the DoubleCloud Transfer

At the very beginning, DoubleCloud Transfer tried to solve the cloud migration problem. It solves the problem as follows: first, a database snapshot is transferred, and then all changes that occurred on the source database since the snapshot was taken are applied to the target database. As a result, an asynchronous replica of the database is created in the cloud, and all that’s left is to disable write operations on the source database, wait a few seconds for the replica to catch up with the master, and then enable operations on the cloud-based database.

This approach allows for migrating to the cloud with minimal downtime.

Over time, in addition to migration tasks, the service began to be used for data transfer between different databases, such as from transactional to analytical databases, as well as between message queues and databases. If you put all supported transfer options in one matrix, you can see that Data Transfer can work with snapshots and data replication streams, scaling horizontally and sharding.

In the service’s terminology, there are two main entities:

  1. Endpoint — these are connection settings along with additional configurations. An endpoint can either be a source from which data is extracted or a target where data is loaded.
  2. Transfer — this connects a source endpoint to a target endpoint. It includes its own settings, primarily the type of transfer: snapshot, replication, snapshot + replication.

Once a transfer is created, it can be activated either as a one-time action or scheduled for activation. In the case of a table snapshot, the data is transferred, and then the transfer is automatically deactivated. In the case of replication, an ongoing process is initiated to continuously move new data from the source to the target.

The Transfer is written in Go and operates as a cloud-native solution. It has a control-plane responsible for API with a work scheduler and data-plane management. When activating user transfers, data planes are created in runtimes. For example in Kubernetes (k8s), where pods are launched to activate the transfer. Thus, it’s a standalone service running on top of a specific runtime. In addition to Kubernetes anything that can run docker may be supported, such as EC2 and GCP runtime. When a transfer is activated, Data Transfer creates virtual machines in the runtime as needed and starts moving data inside. It only takes a few clicks to configure data delivery at gigabytes per second.

The service has been successfully used within multiple teams for several years in production with substantial data volumes. Besides its inherent horizontal scalability, we’ve parallelized it wherever possible. Currently, nearly two thousand data streams run in our internal infrastructure, moving gigabytes per second.

We’ve also implemented the most frequently requested ELT transformations, and ELT capabilities will continue to expand. Configuration can be done through a user-friendly UI, API, and Terraform.

All of this has made Double Cloud Transfer a versatile data transfer service from any source to any destination (with ELT processing capabilities) — both for snapshots and data streams. Users have found numerous creative ways to utilize the service beyond its primary purpose. CDC is just one of many applications of the Data Transfer service.

How do we do it in more details?

Since Double Cloud Transfer has been receiving replication streams at the logical replication level since its inception, the natural step was to make this available to users. This is what transformed Double Cloud Transfer into a CDC solution, working through the processing of the wal-log.

We began implementing the shipping of logical replication events to a queue based on user requests, and it was decided to generate events in the Debezium format so that we could become a drop-in replacement for Debezium. This way, it would be possible to replace one CDC product with another in a configured pipeline, keeping the pipeline operational. This approach would provide useful integrations without introducing a new data format.

As a result, we had to create converters from our internal objects to the serialized Debezium format and cover it with extensive testing, which we did.

Currently, in Data Transfer, the serializer is implemented for PostgreSQL and MySQL sources, and recently, MongoDB source support was added. Now you can easily configure CDC delivery from your YDB tables to Apache Kafka, EventHub or Kinesis in Debezium format in just a few clicks.

In this way, for MySQL and PostgreSQL, Data Transfer became a drop-in replacement for Debezium. Some notable differences include Data Transfer’s ability to handle master migration in PostgreSQL (when a replica becomes the master) with the pg_tm_aux plugin enabled. Additionally, the service can transfer user-defined types in PostgreSQL.

Data Transfer also implemented the ability to organize query-based CDC, referred to as “incremental tables” in the documentation, or informally, “increments.”

Real-world use cases of Double Cloud Transfer

Let’s talk about popular user cases.

OLTP into DWH

The most common use case is using CDC to create replicas of transactional databases (PostgreSQL, MySQL, MongoDB) in analytical data stores (ClickHouse, S3). Storing events in a queue unloads data from the transactional database once, and typically, there are two analytical recipient databases — one in each data center to guarantee survival of region outages.

Problem

Production processes are running in transactional databases, and the business is interested in receiving analytical reports, which are conveniently generated through analytical databases. Since HTAP databases are still considered exotic, there is a need to somehow transfer data from the transactional database to the analytical one.

Solution

Before CDC

Usually, teams transfer data from the transactional database to the analytical one using their own ad-hoc scripts or in-house solutions with snapshots, and in the best case, query-based CDC (Change Data Capture). Typically, this is not formalized into a product and is inconvenient to use.

After CDC

CDC (at Double Cloud Transfer) allows for convenient replication of the transactional database to the analytical one, for example, through a user interface, with a replication lag of just a few seconds.

Regional analytics

Same as above but with extra requirement about regional outages.

Problem

Move their production data into regional analytical DWH with sub-second delay

Solution

Before CDC

Custom Java service was used, which would export a snapshot once a day.

Afterward, a series of scripts were written for exporting, and then another internal service was used, which could perform query-based CDC. However, introducing a new data delivery process was challenging, and delays was on borders of day.

After CDC

The team set up the delivery of incremental tables to a cross-datacenter queue with regular execution — this is one transfer, while two other transfers with the “replication” type extract data from the queue to DWH. Two transfers are necessary to ensure regional compliance — each of these two transfers delivers data to a specific regional DWH installation in a separate data center.

The queue itself provides an additional benefit: only one trip is made to the transactional database, but the data ends up in two analytical databases.

As a result, the team has three transfers: one that runs automatically on a schedule and exports new rows, and two that run continuously in replication mode, extracting data from the queue into analytical storage.

CDC into search engine

Customer from another team started using CDC to update Elasticsearch search indexes, then applied CDC for reactive interaction between components, and ultimately, CDC became an integral part of the project.

Problem

When data in MySQL was modified, it was necessary to update the Elasticsearch indexes.

Solution

Before CDC

Before CDC, it would have been necessary to implement the transactional outbox pattern. However, since Double Cloud Transfer already had CDC when the task appeared, we first implemented event shipping to the queue and created a script to process events from the queue.

After CDC

Double Cloud Transfer allowed us to organize the process of updating search indexes, after which our colleagues, having assessed the convenience of setting up a reactive infrastructure, found numerous applications for CDC. Now, they use CDC to: send push notifications, proxy data to an external CRM, schedule tasks, reactively respond to promotional codes, and much more.

Analysis of raw CDC stream

Customer transformed MySQL/PostgreSQL into a streaming database using CDC events, operating on top of DWH. The tool they created is called “incremental materialized views.”

Problem

In the customer system, there is MySQL configured with settings that many services access to retrieve these configurations, so they need full trail of this config changes for observability purpose.

Solution

Before CDC

Running all services in MySQL was not an option, as it created a significant load.

Initially, caching services were set up — caching services periodically fetched settings from MySQL, and all other services accessed the caching services.

However, over time, the caching services began to consume a lot of resources and started lagging significantly, sometimes by up to several tens of minutes. Meanwhile, services that accessed the settings began taking too long to restart because they took a long time to initialize their caches.

After CDC

After CDC stream setup with Double Cloud Transfer a streaming database was developed, a streaming equivalent of the airflow+dbt combination, where DAGs recalculate derivative data only for the changed rows, reactively responding to CDC events.

As a result, caches started lagging by an average of 5 seconds instead of tens of minutes as they did before. Services were able to get rid of local caches, saving a lot of RAM, and services started restarting quickly. Hopefully, someday, colleagues will provide more detailed information about this solution.

Summary

In the article, we thoroughly explored Change Data Capture from all angles: its history, theory, use cases, open-source practices, corporate practices, and real user stories.

To learn more about CDC read Martin Kleppmann’s article, and experiment with Debezium and Double Cloud Transfer.

It’s worth noting that Double Cloud Transfer is a fully managed cloud native solution that easy to onboard and master. Feel free to come and use it, leave feedback, and request features.

More articles to read

CDC and caches and redis:

  1. RedisCDC: Seamless database migrations and continuous changed-data replication
  2. Cache Prefetching: Building an efficient and consistent cache with RedisCDC
  3. CDC Replication to Redis from Oracle and MySQL
  4. Automating cache invalidation with CDC
  5. Designing an evergreen cache with CDC

Microservice patters:

  1. Distributed Data for Microservices — Event Sourcing vs. Change Data Capture
  2. Reliable Microservices Data Exchange With the Outbox Pattern
  3. DDD Aggregates via CDC-CQRS Pipeline using Kafka & Debezium
  4. CqrsWithCDC on github
  5. Building CQRS views with Debezium Kafka Materialize and Apache Pinot: part 1
  6. Building CQRS views with Debezium Kafka Materialize and Apache Pinot: part 2
  7. Application modernization patterns with Apache Kafka, Debezium, and Kubernetes

Good article about postgresql caveats — PostgreSQL Change Data Capture (CDC): The Complete Guide

More insight on so called “incremental snapshots” approach, non trivial approach to handle huge historycal data in paar with replication:

  1. Incremental Snapshots in Debezium
  2. A Watermark Based Change-Data-Capture Framework

--

--