Implementing Change Data Capture in practice — Part 1

Miguel Gamallo
Fever Engineering
Published in
13 min readSep 4, 2023

--

An elegant and well-organized library
Photo by Tobias Fischer on Unsplash

As software systems move towards models of distributed data, more and more copies of the same information are required, each of them reformatted for a specific purpose. The copying process takes time, but users expect a unified view of the system as if the source of all information was the same.

One of the solutions to address this replication challenge in a scalable way is known as Change Data Capture (CDC): capturing and publishing changes made to the primary source of the data, known as the system of record; and then listening to those changes in the derived systems to keep a near-real-time up-to-date copy of the information.

This post explores the reasons why we decided to adopt this technology at Fever and how we implemented it. It’s the first part of a series on the topic. The first one will focus on the problem and the approach for the solution, and the second one will present the implementation details and the explanation of how we tackled some of the challenges found along the way.

What we do at Fever

As the leading global experience discovery tech platform, we help millions of people every week to find unique thrilling activities in over 500 cities in more than 10 languages, spanning over 6 continents.

Our platform receives tens of thousands of requests per minute and as such, we’re always looking to continue growing while offering the best user experience, which ultimately, is what drives us to use techniques such as Change Data Capture.

Initial needs

Materialized views scalability

As a platform offering a huge variety of experiences, ranging from art exhibitions or beer tasting sessions to big stadium-filling events, there is no one-size-fits-all alternative for the purchase flow. Over time, we’ve developed different kinds of session selectors to streamline the user experience so it’s as easy as possible to select when, where and how an experience will be enjoyed.

Some of Fever’s session selectors
Some of Fever’s session selectors

For the different selectors to work, we need aggregated information about all the sessions of the experience, which in quantity, could range in the thousands depending on the kind of event. At first, the aggregations were happening each time a client visited the page, but shortly after, we decided to use PostgreSQL materialized views as a simple way to pre-compute them. They were calculated every minute, and were working perfectly fine for our needs at the time. That is, until we started growing in multiple dimensions:

  • Number of sessions per experience
  • Number of cities where Fever is present
  • Number of experiences per city
  • Number of different selectors

After a certain point, our database was struggling to compute the materialized views, and this is a use case where near real-time is important because it has a direct impact on sales. To put this into numbers, the total database execution time of the refresh process for the calendar materialized view was 7.05 minutes per hour. Today, this would be within the top 10 queries by execution time in our main database for a process that is not directly serving a client’s request, and this is just one of the materializations.

The need for real time data

As a data-driven company in the business of experiences, information constitutes one of our most valuable assets. At the heart of every data use case, from recommendation algorithm execution to partner-facing dashboards, lies the Data Warehouse (DWH). This is where information from multiple sources is gathered, curated and organized using Apache Airflow.

Airflow is based on the concept of DAGs (Directed Acyclic Graphs): collections of tasks that constitute a batch data processing job, most of which are used as ETLs (Extract-Transform-Load) and scheduled to run at a configured interval. These processes are heavy by nature, and even though a lot of optimizations are regularly done, they have one essential shortcoming: they work by “pulling” rather than “pushing”. This means that by definition, the smallest amount of time is the minimum configurable cron interval (1 minute, if no sketchy artificial execution time code is added), and most importantly, the interval can never be shorter than the execution time of the DAG. In practice, it took approximately 10 minutes for a sale to be reflected in our sales dashboards.

So ETLs are not real-time, is this an issue? For most of the use cases, no. But in the world of experiences there are key moments, like sales openings, when both ourselves and our partners need the most up-to-date value, and every second counts.

A naive approach to address this issue would be to directly query our core user-facing database, but it is fine-tuned to the transactional marketplace use cases and not optimized for read-heavy workloads. We need a way to make this data available fast without impact on the core database.

“Are we there yet?”

A scene from the Shrek 2 movie in which Donkey keeps asking “are we there yet?”
Polling is extremely annoying

Apart from being annoying, it generates traffic even when there are no updates, increasing the load on the source application. It also generates a two-way dependency, since the source application must provide an API to be consumed by the derived system. However, it’s the simplest option to implement, so we had a few use cases for it:

  • Microservices data integration → Some of our microservices synchronize the data from others via REST requests to internal endpoints, using “last update” timestamps for incremental updates.
  • Transactional outbox domain events publication → We use this pattern to provide eventual consistency guarantees between our system of record and derived data systems that use domain events. At its core, it’s a piece of code that reads messages representing domain events from the outbox database table, which it then serializes and publishes into our domain events bus (RabbitMQ), finishing by marking the message in the outbox as processed. To do this, the publication worker is frequently polling the outbox for new messages to send.

Why not use domain events?

When analyzing all of these issues, the first idea that came to mind was to use domain events to swap from a pull to an incremental push model by subscribing to the events from each derived data system. After all, we already use them extensively, have the infrastructure in place and even have published a set of tools like buz that provide us with an easy-to-use async event bus in Python.

Domain events represent things that have happened in one of our systems with business meaning that may be of interest to other systems. For example, we send a notification when a purchase takes place (OrderCreatedEvent), or update the average rating of an experience when a review is published (review AnswerCreatedEvent).

The first problem of using domain events here is the initial synchronization. If we introduce a new derived system, we need a way to get it up-to-date with the information. We could re-publish all the events, but that would destroy their meaning: if a system sees an OrderCreatedEvent, it’s reasonable for it to believe that the event happened a relatively short time ago. An alternative to republishing would be to introduce a resynchronization use case using incremental event identifiers and a mechanism such as REST, but this would require modifying the source system to expose this information on demand and in a scalable way to support future systems, increasing the complexity of the solution.

Fortunately, smart people in the software world already came up with a solution for this issue: Event Sourcing. The idea is to have the application’s source of truth be an append-only sequence of domain events so that the current state of an application is determined by examining all change events associated with it since its inception, rather than saving only the current state. This allows applications to rebuild the state at any point in time by replaying events.

Unfortunately, as promising as it looked, we discarded event sourcing as a valid solution for our issues, mainly for 2 reasons:

  1. Having the source of truth be the domain events requires a profound technical and mentality change that was not worth the cost for us. If Fever was built from scratch today, maybe building the application in an event-sourced fashion would make sense. But migrating our core application’s main design is too high a price to pay for the data integration problem we’re trying to solve.
  2. There are many legacy use cases without domain events integration. One of the most prevalent entities in the derived data systems is our Session, and a quick usage search for this entity in our main application yields 723 usages. Combine this with a variety of patterns (or lack thereof) used throughout legacy code, and the seemingly simple task of including domain events for each change in this entity becomes a nightmare.

The solution

Having discarded replication at the domain level, we started looking for replication solutions at the infrastructure level, and thus arrived to Change Data Capture (CDC). This is the process of observing all data changes written to a database and extracting them in a form in which they can be replicated to other systems (Kleppmann, 2017). The source of information to observe is the database replication log, a persistent log where the engine first writes the changes before actually changing the data in the corresponding disk pages. The replication log’s main purpose is to provide durability guarantees on crash recovery scenarios.

In our particular case, we’re using PostgreSQL so this replication log is the WAL (Write-Ahead Log). Initially, the WAL format was considered to be an internal implementation detail, but in order to support logical replication, PostgreSQL 9 introduced a Streaming Replication protocol standardizing a format for the WAL contents to be streamed. The first consumer of this protocol was PostgreSQL itself in the form of standby servers, but the fact that this is a public API means that other systems are able to consume it. Later, in release 9.4, they added the logical decoding feature, introducing the concept of replication slot and allowing to customize the output format for each slot using output plugins. And this is exactly what Debezium, our CDC tool of choice, uses under the hood.

Debezium

Debezium is a piece of software for CDC that monitors a database replication log and converts that log of changes into Kafka records with a structured format. It runs on top of Apache Kafka as a set of Kafka Connect connectors, one for each of the supported database technologies: PostgresSQL, Oracle, MySQL, MongoDB, Cassandra, DB2…

For the PostgreSQL connector, a Data Change Event is generated for each row-level INSERT, UPDATE, DELETE or even TRUNCATE. There is also a READ event, more on that later. Each message includes the row identifier in the header (primary key by default), and a payload with the state of the row after the change occurred. Every event includes the current schema of the row to account for migrations.

Once we have the set of database changes serialized as Kafka records, we need a way to transform them into the format required at the derived system. We can use any of the many sink connectors from the Confluent catalog, or we can directly connect our applications to the cluster as consumers and parse the information ourselves. Using connectors has the huge benefit that we can bundle a message transformation called New Record State Extraction that converts from the Debezium data format to a simple message composed of key-value pairs, which most of the sink connectors out there understand directly.

Example: Replicating a single table between two databases

Let’s imagine we want to replicate a single table called sessions from our core database to the database of our catalog microservice.

Architecture diagram of the Debezium CDC solution for replication of a table from “core” to “catalog”
Architecture of the Debezium CDC solution for replication of a table from “core” to “catalog”

Here is a summary of the configuration required for each component of the system. Notice that some configurations are specific to our stack and environment, but they may serve as a guide for other implementations with similar setups:

coredatabase → Origin database, in our case running on AWS RDS. Changes there:

  • Set rds.logical_replication=1 , wal_level=logical
  • Create a user for Debezium, giving it the rds_replication and SELECT permissions over the sessions table
  • Create a publication called dbz_publication including the table to replicate, since we won’t allow Debezium to auto-create them

Debezium source connector, in our case running Debezium 1.9 with the following configuration:

connector.class=io.debezium.connector.postgresql.PostgresConnector
database.dbname=core
database.hostname=core.example.eu-west-1.rds.amazonaws.com
database.password=${secretManager:debezium-secrets:debezium_db_password}
database.port=5432
database.server.name=cdc.core.json
database.user=debezium
heartbeat.interval.ms=60000
key.converter=org.apache.kafka.connect.json.JsonConverter
plugin.name=pgoutput
publication.autocreate.mode=disabled
signal.data.collection=public.common_debeziumsignal
table.include.list=public.session
tasks.max=1
topic.creation.default.cleanup.policy=delete
topic.creation.default.compression.type=lz4
topic.creation.default.partitions=1
topic.creation.default.replication.factor=3
topic.creation.enable=true
value.converter=org.apache.kafka.connect.json.JsonConverter

A few notes on this configuration:

  1. It runs over MSK connect, and credentials are managed using IAM for connection to the brokers and the AWS Secrets Manager to retrieve the database password.
  2. We’ve enabled the connector to create its own topics, but tweaked IAM permissions to only allow it to create topics with the prefix cdc.core.json This configuration is called database.server.name. In more recent versions this was renamed to the more sensible topic.prefix
  3. We are using JsonConverter both for key and value, so we store the whole schema definition with each message. There are alternatives such as AVRO to make it less verbose, at the cost of an extra infrastructure piece that serves as the source of truth for schemas, the schema registry.
  4. The list of tables we want to publish is in table.include.list

Kafka bus

  • Using Kafka 2.8.1 as an AWS MSK Cluster
  • Allowing the debezium user full control over any topics that start with cdc.fever.*
  • 7 days of log retention log.retention.hours=168
  • JDBC sink connector, version 10.6.4:
auto.create=false
auto.evolve=false
connection.url=jdbc:postgresql://catalog.example.eu-west-1.rds.amazonaws.com:5432/catalog_db?user=replication_user&password=${secretManager:debezium-secrets:catalog_replication_db_password}
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
delete.enabled=true
fields.whitelist=id,event_id,label,starts_at,ends_at,place_id,currency,ticket_price,session_type_id
insert.mode=upsert
key.converter=org.apache.kafka.connect.json.JsonConverter
pk.fields=id
pk.mode=record_key
table.name.format=core_session_replica
tasks.max=1
topics=cdc.core.json.public.core_session
transforms.unwrap.drop.tombstones=false
transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
transforms=unwrap
value.converter=org.apache.kafka.connect.json.JsonConverter

A few notes on the sink connector configuration:

  1. fields.whitelist specifies which fields from the topic we want. This allows us to create replicas with less columns than the original tables
  2. We use the ExtractNewRecordState transformation as explained before to transform from the Debezium format to a plain key-value format before passing the record to the JDBC sink connector
  3. pk.mode → Tell the connector where to find the primary key for the destination table. In this case, from the key of the record (after the transformation)
  4. insert.mode → When receiving records after the transformation, we don’t know if this was an update, insert or read or event delete. If we were to simply insert every time, it wouldn’t work since by design, CDC generates multiple messages for the same key.
  5. delete.enabled → Delete records when receiving tombstone messages. More on this later.

catalogdatabase → Destination database, configured as follows:

  • A core_session_replica table with the fields defined above in fields.whitelist, using the same data types as the origin database
  • replication_user with write permissions over the core_session_replica table

So far, this is only marginally better than a hot standby read replica, at a much higher cost. The benefit of CDC becomes apparent once we start using it as a solution for the rest of the issues mentioned in the beginning of the article. Continuing with the example, if we now need the session data in the Data Warehouse (DWH), and also as a source of information for incrementally-calculated tables that replace the materialized views, we have:

Leveraging the same data stream for DWH and incremental materialized views
Leveraging the same data stream for DWH and incremental materialized views

For the Data Warehouse use case, we’re using the Snowflake Kafka Connector. The cdc-worker is not a Kafka connector, but an in-house built Python application that subscribes to the topics, parses the messages using the schema and incrementally builds the table that serves as an availability aggregation for our calendar. With this setup, we have three different derived systems building tailored-made read views of the same information based on a unified stream of changes, and with no impact on the source database.

Adding new derived data systems

One of the ideas that took a while to sink for me was that this is a streaming system, and only the present matters, when “present” is defined by the retention window. Since the retention is configured for 7 days, how do we get a new consumer up to date with all the historical information?

Debezium has a clever mechanism for this: the signalling table. This is a table in the source database which is configured for the Debezium connector as signal.data.collection, and which allows to ask the connector to perform certain actions at runtime. There are a few signals that can be sent, but the most important one is to tell Debezium to make an incremental snapshot of one or more tables. This is a way to perform a snapshot in chunks while streaming continues, as wonderfully explained in this Debezium blogpost. It’s worth noting that while these kind of snapshots are great for their purpose, they are very slow. For some of our biggest tables, they may take hours to complete.

Snapshots send events of type READ to the bus, that can be interpreted by the consumers not as a change in the data, but an option to reconstruct the state. At Fever, we built a simple tool to send signals to Debezium from our backoffice, simplifying the process and allowing developers without direct access to the database to run incremental snapshots.

When running the connector for the first time or when adding new tables, an initial snapshot is performed before the streaming starts. This snapshot is a regular one, as it doesn’t need to reconcile the state with changes as they are streamed. Of course, this snapshot is much faster.

Conclusion

Up to this point, the reasons why we decided to implement a CDC system and how such a system works at a high level should be quite clear. However, there is more to it than meets the eye, so stay tuned for the next part of this series where we’ll deep dive into some of the real-world details of the implementation and validate how well the solution matched our initial needs.

References:

--

--