Change Data Capture at Brex

Jun Zhao
Brex Tech Blog
Published in
17 min readFeb 1, 2023

In July 2021, we started to look into the concept of data mesh written by Zhamak Dehghani. Since then, we have taken several steps in service of data mesh [1][2]. In this blog post, we will share our journey in building and scaling the Change Data Capture platform and how it is being leveraged at Brex.

This post is divided into the following sections:

  1. Introduction — Introduction to CDC and why we started to use it at Brex.
  2. Architecture — Talks about an architectural overview of our CDC infrastructure which consist of two components:
    - Data Plane — Talks about Debezium and configurations that are important to highlight.
    - Control Plane — Work done by us that improves upon just using Debezium. We think this makes CDC infra scalable to be used by hundreds of databases at Brex.
  3. Use Cases at Brex — Talks about how data and product teams use CDC at Brex.
  4. Lessons Learned — Pitfalls we encountered and how we overcame them.
  5. Open Source Contribution — Talks about the contribution we made to the Debezium open source project.

Introduction

Change Data Capture (CDC) is a pattern to stream real-time database mutations into events that can be processed by downstream consumer applications. In relational databases like PostgreSQL, CDC uses a log-based approach and reads the transaction logs, such as PostgreSQL Write Ahead Log (WAL), to identify and track data changes. This pattern allows addressing challenges with traditional techniques like dual-writes and periodic fetching in multi-datastore synchronization, and it has become increasingly popular in various use cases such as search indexing, analytics processing, caching, etc. [3][4][5].

At Brex, synchronizing multiple heterogeneous datastores has presented itself as a recurrent need for various engineering teams. For example, our search team had built an in-house WAL streaming service to perform real-time indexing with ElasticSearch. The Data team suffered from maintaining complicated replication pipelines from PostgreSQL databases to Snowflake warehouses using Airflow jobs. As the complexity of our organization and systems grew, it became necessary to build a scalable, performant, and reliable CDC infrastructure to capture real-time data mutations between microservices with a decoupled architecture.

Brex microservices are backed by PostgreSQL databases, and we use Kafka as the asynchronous messaging infrastructure. The CDC platform we built focuses on streaming change events in data pipelines from PostgreSQL to Kafka, and it has become an integral part of Brex’s derived data processing infrastructure to support different services across search, business analytics, security, expense management, and more.

Architecture

CDC platform architecture

From a high-level perspective, we abstract the architecture into two parts: control plane and data plane. The control plane system manages the data plane resources and keeps them in desired state. We deploy and manage all components in EKS except source databases and the Kafka cluster.

Data plane consists of:

Control plane consists of:

  • CDC operator
    - Orchestrates underlying data plane resources;
    - Manages the lifecycle of pipeline resources;
    - Monitors pipeline status, detects issues, and automatically recovers from failures.
  • Database Manager
    - In-house component responsible for configuring every database at Brex;
    - For the purposes of CDC, it creates objects such as replication user, publication, management tables, and stored procedures.

Data Plane

Data plane workflow

In the data plane, the Debezium PostgreSQL connector is the main component that we use to stream change events from the source database to the target Kafka cluster. A lot of literature can be found about using Debezium online, so let’s only talk about some important details:

  • We choose pgoutput as the decoding plugin on our platform, because it is the standard logical decoding output plugin and maintained by the PostgreSQL community.
  • Change events for each table are published to destination Kafka topics with domain hierarchical naming, E.g. <domain>.<database_name>.<table_name>.<version>. We intentionally add the version suffix to Kafka topics to identify CDC data format and prepare for major version format changes.
  • We also integrated with the Confluent schema registry in our CDC Kafka cluster and all change events are serialized in Avro.

PostgreSQL Kafka Connector

We adopted the Strimzi Cluster Operator and KafkaConnector custom resource to manage the Debezium connectors in the CDC Kafka connect cluster. Until we built our control plane, each CDC pipeline required writing a configuration that looked like this:

PostgreSQL Kafka connector configuration example

Important things to note:

  • All of our pipelines explicitly specify table.include.list and publication.name to capture specific tables. This is an intentional choice covered in Using Partial PostgreSQL Publication section.
  • We also configured heartbeat.action.query to trigger a separate process in the connector that periodically updates a management table in the source database. This is to advance the confirmed LSN of the logical replication slot for CDC and allow the database to reclaim the WAL space with low write traffic (more details in Configuring Heartbeat Process section).
  • We implemented a custom converter to convert a PostgreSQL user-defined datatype (financial_asset) to a Kafka Connect schema type, which enables downstream consumers to process the related column values with schema registry support. More details can be found in Custom data type conversion section.

CDC Snapshotting

We chose to adopt incremental snapshotting instead of initial consistent snapshotting. This snapshotting pattern provides several advantages over the standard initial snapshot process:

  • Incremental snapshot can be run in parallel with streaming changes. Real-time change events are captured continuously throughout the snapshot process without blocking.
  • The snapshotting is executed in chunks, allowing us to track a snapshots progress as well as pause, resume, or abort a snapshot in the middle of its execution.
  • Incremental snapshots can be triggered on demand at any time, and this process can be repeated as needed to adapt to database updates.

We highly recommend reading DBLog: A Watermark Based Change-Data-Capture Framework. Debezium incremental snapshot process is based on this. The key idea is to introduce low/high watermarks to interleave transaction log events and select table rows in a series of configurable chunks to capture the full state. The process reads in parallel both the existing data and the changes from the transaction log. When a high watermark is reached in the transaction log, a new chunk of rows is defined and snapshotted to table-specific Kafka topics.

With incremental snapshotting, Debezium connector generates read events with streaming events (insert, update, and delete events). Thus, the ordering of change events in Kafka topics presents a different semantics, and read events can be consistent with any update and delete events that occur prior to the snapshot. Our CDC consumers are required to handle this semantics properly.

Custom Data Type Conversion

After we had initial working version for CDC for a couple of tables, we surveyed the source databases for couple of things:

  1. Whether there are custom data types in our database;
  2. Whether there are rows that have very large amounts of data to cover problems highlighted in the CDC blog by Shopify.

For #2, we didn’t really have any database with very large rows. We just made our Kafka max message limit as 5 MB, and this was good enough for 99% of our databases.

We found one custom datatype in our databases, namely financial_asset mentioned in the above example. Debezium connector by default converts unknown column types to bytes, which increases deserialization overhead of downstream consumers. To solve this problem, we implemented the Debezium Service Provider Interface (SPI) CustomConverter and configured the Debezium connector to support the custom data type mapping and conversion.

With our implemented converter extension, the custom data type financial_asset is converted and registered with proper Avro schema. Downstream consumers will be able to leverage schema registry support and smoothly deserialize the related data fields with AvroConverter.

Control Plane

Since we manually launched the data plane pipelines, many teams at Brex were excited to adopt CDC. For the first 10–15 databases we worked with teams to set them up with the right configuration. After doing this enough times, we realized that we needed a control plane to:

  1. Remove most of the boilerplate code from CDC configuration. As you can see above, you have to know a lot of Debezium internals to get a CDC pipeline working.
  2. Select good defaults automatically like heartbeat query, etc.
  3. Detect most common infrastructure errors and self heal.
  4. Extend the CDC infrastructure to do things like identify data loss and advanced data observability. This is a big topic that deserves its own blog post down the line.

While designing the control plane, we considered multiple options, such as Temporal workflows, custom cron jobs, Kubernetes operator, etc. With advantages of orchestrating workloads in the Kubernetes ecosystem, we finally decided to use Kubernetes custom resource (CDCPipeline) to represent CDC pipelines and leverage the operator framework to manage the pipeline lifecycle with related data plane resources. Let’s look at how this operator works:

Control plane workflow

As shown above, the CDC operator manages the lifecycle of CDC pipelines. It watches the CDCPipeline custom resources and reconciles each resource by:

  1. Updating the corresponding database credential entry of an ExternalSecret custom resource to source the credentials. We will talk more about this in Secrets Management section.
  2. Creating a KafkaConnector custom resource with proper configuration.
  3. External Secret operator then fetches database credentials to the Secret resource that is mounted to the CDC Kafka connect cluster;
  4. Meanwhile, the Strimzi cluster operator continues to reconcile and creates the corresponding Debezium PostgreSQL connector on the data plane.
  5. When the connector comes up and becomes ready, the CDC operator enables the connector heartbeat process and performs other post-processing steps such as snapshotting interested tables in the source database.

Custom Resource Definition

With this setup, let’s consider how little a user has to do to create an operational CDC pipeline. You just need to write a CDCPipeline custom resource that looks like this:

CDC pipeline custom resource example

With this configuration, users can specify the source database name and tables where they would like to capture change events, and all streaming resources will be automatically managed by the CDC operator behind the scenes. From the above example, the operator generates the KafkaConnector custom resource spec as described in PostgreSQL Kafka Connector section.

Lifecycle Management and Failure Recovering

In addition to boilerplate, we very quickly realized that, even with tens of databases, there would be a whole range of common errors around database and network. These are infrastructure errors, and they can’t be federated to the teams who created the CDC pipelines. We needed a scalable way to address these errors, and our control plane was designed with the goal of identifying and auto recovering from common failures. To understand this, let’s look into the lifecycle management of a CDC pipeline:

CDC pipeline lifecycle management

In the pipeline lifecycle, it starts from the starting point and ends in the terminal state. Whenever a lifecycle event is triggered intentionally or unexpectedly, the entity would transition to an unstable state. In general, the CDC operator identifies lifecycle events and performs management operations to transition the entity from unstable states to stable states or the terminal state. The CDC operator keeps collecting the status of data plane resources — e.g., source database, Kafka connector, and target Kafka topics.

For example, in case of network issues between the source database and the CDC connector, the Kafka connector task will transition into the failing state. The operator will automatically detect it and restart the failed connector/task to recover.

When it comes to planned maintenance such as source database upgrade, we mark the corresponding custom resource with “in maintenance” annotation, and the normal reconciliation process will be paused such that we can perform proper maintenance operations to the underlying pipeline resources. Once the maintenance is done, we remove the “in maintenance” annotation, and the operator will resume the regular reconciliation process.

Secrets Management

In the control plane, we need to manage the source database secrets, which are used by the pipeline connectors. At Brex, we use AWS Secret Manager service to manage database credentials. For CDC, we adopted a dedicated ExternalSecret custom resource, which contains all pipeline secret references and is fully managed by the CDC operator. When the CDC operator reconciles the pipeline custom resource, it inserts/updates the corresponding entry in the managed ExternalSecret custom resource. The ExternalSecret operator then continues to sync the secrets from AWS Secret Manager service to the target Kubernetes Secret resource. Finally, the target secret is attached to CDC Kafka connect cluster and used by the corresponding Debezium Kafka connector.

With the example in Custom Resource Definition section, the updated secret entry is shown as below:

Use Cases at Brex

The CDC platform has supported numerous use cases within our infrastructure. Here we chose several typical use cases for discussion.

Snowflake Replication Pipeline

At Brex, we adopted Snowflake as the OLAP platform to perform offline data processing. To support this, we need to replicate the OLTP data from PostgreSQL databases to the Snowflake warehouses. Our legacy replication system loads data to Snowflake using Airflow by periodically fetching data from the source database tables. This approach has presented several limitations:

  • High latency: The most frequently updated tables are synced at 15-minute intervals, and most tables are synced every one hour or more.
  • High maintenance cost: Every database requires specific replication configuration and must be updated on schema changes. There are also a series of manual steps in the Airflow console to perform a proper backfill.
  • Lack of isolation: The replication job queries originating databases (generally a replica) incurring load that competes with other users, which would lead to resource contention.
  • Data consistency: For tables with deletion, we have to introduce specific operations and consume non-trivial resources of the underlying source database. This deletion handling becomes more problematic as tables grow in size.
CDC-based Snowflake replication pipeline

To solve this problem, we replaced the complicated legacy solution with CDC-based replication pipeline system. As shown above, change events are first captured in real time to the Kafka cluster, then we use a custom-built Snowflake connector and continue to transform and load CDC data to the target Snowflake warehouse in a decoupled manner. With CDC, it not only dramatically reduced the replication latency and maintenance overhead, but also improved the overall data consistency and pipeline isolation within our infrastructure.

Search Indexing

On the Brex dashboard, you can search through your credit card transactions very easily. This is powered by a search service which is backed by ElasticSearch. The ElasticSearch backend contains searchable data that is synchronized from our online PostgreSQL databases. Search service has two parts:

  • Search Streamer: CDC pipelines across different databases that have transaction and user data, etc.
  • Search indexer: a service that is responsible for handling creation, deletion and updates to the Elasticsearch indexes based on the change events in Kafka cluster. This generally includes calling out to other services to get the latest data, and potentially cascading to update other related documents.

Previously, search streamer was our in-house implementation of WAL streaming. Moving this to our new CDC infra has significantly improved reliability and has substantially reduced the end-to-end indexing latency and the maintenance cost for the Search team.

Transactional Event Publishing

At Brex, we operate over 100 microservices, which are backed by their own PostgreSQL databases. As we scale, it has become a very common use case for services to perform two primitive actions together: making database updates in PostgreSQL and publishing events to Kafka. To achieve the atomicity of these two operations, we built Transactional Event Publishing (TEP) on top of the CDC platform to provide a strong consistency guarantee between asynchronous event publishing and database updates.

Essentially, TEP extends CDC pipelines with custom change event transformation. In a nutshell, our TEP library intercepts the given database transaction and inserts the publishing event to a specific database table (outbox table). With relational database ACID guarantees, the original statements in the transaction and the additional insert operation for event publishing can be committed or rolled back atomically. Only when the transaction is successfully committed, the pipeline connector captures the insert events of the outbox table and sends them with custom transformation to Kafka in a real-time manner.

With the use of CDC and the adoption of TEP across engineering teams, we observed the improved reliability of interservice communications, and obtained better data audits and observability.

Large-Scale, Continuous Data Migrations

As our business grew, Brex decided to redesign our spend management systems and built an entirely new stack that became the foundation of Brex Empower. To enable existing customers to use Empower without data loss, we’d have to migrate all historical data from the old stack to the new one. This involved the migration of hundreds of millions of rows of data, with the requirement of no customer downtime.

Engineering teams ultimately chose to leverage CDC as the mechanism for triggering backfill across many of our systems, as it offered the following benefits:

  • Continuous backfill: the CDC infrastructure allowed us to capture all ongoing changes in the old stack, and continuously backfill to the new stack as needed. This helped ensure continuous data consistency and helped with the requirement of no customer downtime.
  • Data exhaustiveness: processing through all CDC events let us be confident that 100% of data was backfilled, without having to write custom jobs to iterate through all data in legacy databases.
  • Scalability: CDC was both a technically and operationally scalable solution for migration, as it has been load tested on high volumes of data and requires no manual intervention once set up.

Lessons Learned

While scaling the CDC platform, we came across several issues, and we will look into the lessons learned in resolving these problems.

Using Partial PostgreSQL Publication

When initially integrating CDC, we opted to create publications for all existing and future tables (CREATE PUBLICATION name FOR ALL TABLES), allowing us to receive WAL events from any table and filter unnecessary tables out in our Debezium connector configuration. However, this ALL TABLE publication requires the enrolled tables to have a primary key or replication index (replica identity) and will prevent updating or deleting rows in the table if the replica identity is missing[6]. It turns out that we have some database tables that do not have primary keys.

In response, we switched to using a partial (non ALL TABLE) publication and dynamically managed the enrolled tables in our control plane operator. This is also challenging because:

  1. A publication would need to be created and then altered by the CDC operator. To alter a publication, the database user must be an owner of the enrolled tables.
  2. We don’t want the CDC operator to be a super user that has ownership of the tables that it is streaming.

To solve this problem, we leveraged our database manager to create a user with limited privileges and give it a sudo-like operation to elevate the permissions temporarily to alter the publication.

  1. The Database Manager component initializes a partial publication owned by the CDC replication user with an empty set of tables in service databases.
  2. We define a set of publication management stored procedures with SECURITY DEFINER, which are only accessible to the CDC replication user. The procedure is executed with the privileges of the user that owns the procedure (the user that created it) instead of the user that calls it, which enables the CDC user to manage the publication without actually owning the enrolled tables. Here is an example for the cdc_set_tables function:

Configuring Heartbeat Process

To ensure low latency for streaming changes made to a database, we created alerts for replication slot lagging. In the early days of CDC, we observed that the CDC replication slot started lagging, and the transaction log disk usage kept increasing if there was no write traffic to the source database. Interestingly, the corresponding replication slot remained active, and the Debezium Kafka connector was running fine without errors. After investigation, we found that the root cause was because the Debezium connector could not advance the LSN of its replication slot with AWS RDS internal system traffic[6]. More specifically, all PostgreSQL databases in a RDS instance share the WAL and logical replication slots work per-database. We learned that AWS RDS periodically writes to its own system tables in the admin database that is invisible from the client’s databases. This internal traffic generates new WAL and thus causes the replication slot lags in client’s databases to increase continuously.

To solve this issue, we enable the heartbeat process in the Debezium connector with the action query (heartbeat.action.query) as below:

Heartbeat action query for CDC pipeline

With this configuration, the connector launches a separate process that periodically updates the heartbeat table (debezium.debezium_heartbeat) in our database. This synthetic write traffic would be able to invoke the Debezium connector to confirm the latest LSN and advance its replication slot, which finally allows reducing replication lagging and reclaiming the WAL space.

Multiple CDC Consumers for Phased Migration

During our data migration to Brex Empower, we decided to migrate our customers in phases, rather than all at once. While CDC offered the benefits listed in Large Scale, Continuous Data Migrations section, it lacked the ability to target data migrations for specific customers. We therefore would’ve had to wait until millions of rows were backfilled before migrating even the first customer, which would be too long.

To mitigate this, we ended up using two CDC consumers, one starting at the earliest offset, and one starting at the latest offset. The former was responsible for exhaustively migrating all data, while the latter was responsible for ensuring new data was continuously backfilled for consistency. We then supplemented the exhaustive backfill with manual migration jobs that targeted data migrations for specific customers. The result was that we were able to complete data migrations for specific customers much earlier, and onboard them to and bring them the value of Brex Empower much sooner.

Open Source Contribution

While running our CDC pipelines in production, we experienced an issue reported in the Debezium open-source issue dashboard: the Debezium connector was not able to find the starting LSN after restart, and it skipped messages while committing offset for a long time. By digging into Debezium connector source code, we found the root cause:

  • The logic to decide whether the message with a given LSN should be skipped is located in WalPositionLocator. In the version that we used (v1.8.1.Final), if the last received LSN passes the startStreamingLsn for any reason after restart, the connector would never stop skipping messages.
  • In our incident, when the connector resumed and started processing messages again, its last received LSN jumped to an LSN which was higher than the startStreamingLsn stored in WalPositionLocator. Since the LSN of following event messages would increase monotonically, the connector mistakenly considered them to be already processed, which resulted in the above issue.

With knowing the root cause, we submitted a fix to prevent this issue and contributed to the Debezium open-source project. This bug was finally fixed from Debezium 1.9.4. Also, all these learnings motivated us to continue investing in advanced end-to-end monitoring for CDC and TEP pipelines to improve the overall reliability and data observability, which deserves a separate blog post.

Finally, to prevent the above issue, we recommend upgrading the Debezium connector to 1.9.4 or higher versions for production use cases.

This blog post was co-authored by Nikunj Yadav, Thomas Cesare-Herriau, and Daniel Lo.

CDC Infrastructure was built by the team: Jun Zhao, Adnan Abdulhussein, Jim Myers, Nikunj Yadav, Yingying Tang, Hussain Kader, Yujie Li, Selena Flannery-Logg.

Very special thanks to Thomas Cesare-Herriau, Minsu O, and Daniel Lo for contributing to the development of our CDC platform!

If you want to help build next-generation financial software and services, come join us at Brex!

Reference

[1] Transactional Events Publishing At Brex.

[2] Migration from Self-managed Kafka Cluster to Amazon MSK.

[3] Capturing Data Evolution in a Service-Oriented Architecture, Medium Airbnb Tech Blog, 2018.

[4] DBLog: A Generic Change-Data-Capture Framework, Medium Netflix Technology Blog, 2019.

[5] Capturing Every Change From Shopify’s Sharded Monolith, Shopify Engineering, 2021.

[6] Lessons Learned from Running Debezium with PostgreSQL on Amazon RDS, Debezium Blog, 2020.

[7] Replica identity for logical replication.

[8] Debezium connector for PostgreSQL Documentation.

[9] Kubernetes operator pattern.

[10] Data Mesh Principles and Logical Architecture.

--

--