Transactional Events Publishing At Brex

Yingying Tang
Brex Tech Blog
Published in
10 min readMay 16, 2022

TL;DR

In H2 2021, we launched Transactional Event Publishing (TEP) to product and engineering teams. TEP is aiming to provide a strong consistency guarantee between asynchronous event publishing and a database transaction. With the gradual adoption of TEP across different engineering teams, we observed the improved reliability of interservice communications, and obtained better data audits and observability.

This effort is one milestone toward achieving our goal of building a distributed Data Mesh architecture within the Brex platform/backend, which ensures data capture, transport and transformation, governance, lineage, and discovery.

Background

As mentioned in our previous article, at Brex, microservice applications are deployed and managed in Kubernetes. Services communicate with each other through synchronous gRPC calls with Protobuf, or through asynchronous event processing over an Event Infrastructure. The Event Infrastructure is backed by an AWS MSK cluster, providing libraries for services to easily publish and consume events to/from Kafka brokers, with retries and deadlettering supported on top. “Events” in our Event Infrastructure are domain events defined via Protobuf, representing part of the service business domain.

The Event Infrastructure provides several benefits compared to synchronous communications:

  • Decoupling between publisher and consumer services by providing a strong contract via protobuf but more relaxed requirements over consumption, unlike gRPC where failure modes of two services are coupled because of synchronous communication.
  • The Event Infrastructure helps broadcast a series of events representing the past and current state of the application, and the downstream services can be notified of the state change.
  • The Event Infrastructure helps failure recovery and improves reliability. The downstream services could retry and store the events in dead letter locations, without the need for upstream services to retry publishing.

On the other hand, the Event Infrastructure exhibits some disadvantages:

  • The Event Infrastructure does not guarantee a strong consistency between event publishing and local database operations, since a single transaction cannot be spanned over a database and event streams.
  • Having just the domain events in the Event Infrastructure is not enough. We have several use cases that are interested in the raw changes from databases, such as data migration, offline analytical processing, and search index building.

To resolve these issues, we built Transactional Events Publishing, which is using the Outbox Pattern to capture, process, and publish the changes from the databases.

TEP: Why and How

The Need for Outbox Pattern

At Brex, we operate over 100 services, and most of the services have their own dedicated PostgreSQL databases on AWS RDS. Frequently, a service needs to update the records to its database, while propagating events to other services. It raises the question of how we can automatically update the database and publish events.

The pseudocodes below illustrates 3 potential approaches, and none of them works consistently:

  • Approach 1: If we publish the event before a database transaction, we could possibly succeed in publishing the event while failing the database transaction. In this case, we are publishing an event that hasn’t even occurred and the consumer might end up in an inconsistent state.
  • Approach 2: If we publish the event within a database transaction, it is vulnerable to publishing the same events repeatedly during failures. An event could be published successfully even when the database transaction fails and rolls back, and if the database transaction is within a gRPC call, a failure of database transaction could result in gRPC retries and repeated event publishing.
  • Approach 3: If we publish the event after a database transaction, we could potentially succeed with the database transaction, but fail to publish the event, which results in inconsistencies between the services.

This is a typical issue that can be solved by Outbox Pattern. Instead of publishing an event, we should insert a record that represents the event into an Outbox table within the same database transaction. In the meantime, an asynchronous process monitors the Outbox table, extracts the new record, and publishes to the Kafka brokers.

Some product teams have been following this pattern. They use Kubernetes CronJobs to periodically scan their Outbox tables and publish the events. However, this approach is not ideal, as it cannot guarantee real-time event publishing, and does not scale as the number of services following this pattern increases. In addition, accessing the database from other CronJob processes increases the burden of the database.

The Implementation of TEP

To standardize the way for product teams to use Outbox Pattern, and capture the database changes in near real time without introducing extra burden on the databases, we have implemented the Transactional Event Publishing, as seen in the TEP architecture diagram.

TEP is mainly composed of 3 parts:

  • ​​TEP Table
  • TEP Kafka Connector (Debezium)
  • TEP Publisher library

TEP Table

To onboard TEP for a service, a TEP table (named as transactional_events) will be created in the service’s database, with the same table schema as shown in the diagram. In addition, publication, TEP user, and role are created as shown in below queries, which will be used by the TEP Kafka Connectors. We also need to create a heartbeat table (named as debezium_heartbeat) for periodic heartbeat record generation, which will be discussed in section Lessons Learned.

TEP Kafka Connector

We have adopted the Debezium connector to capture the database changes. The Debezium connector takes the advantage of PostgreSQL’ Write-Ahead Logging (WAL) stream, which captures all modifications to the database before they’re applied and is primarily used to keep data in sync between PostgreSQL servers. The connector creates and maintains a replication slot in the database for its position in the WAL stream, decodes the PostgreSQL binary log using pgoutput plugin, filters the change data events to specific tables, and emits the events to Kafka topics. The data capture happens nearly in real time, and does not introduce extra burden on the databases.

Debezium connectors are deployed using Kafka Connect, which is a framework and platform that enables scalable and reliable streaming of data into (source) and out of (sink) Kafka. At Brex, we have been using Kafka Connect clusters and Kafka Connectors for different use cases (TEP using Debezium connector as introduced in this article, data replication pipeline using snowflake connector, etc). To manage the Kafka Connect clusters and connectors more efficiently, we adopted Strimzi Cluster Operator, which uses the Kubernetes Operator pattern to define these Kafka components as Custom Resources (e.g. KafkaConnect, KafkaConnector), and provides custom APIs for deploying, running, and managing the Kafka components.

When onboarding TEP, a new Custom Resource KafkaConnector will be created with the following yaml file as an example. Note that we are using a single Debezium connector per PostgreSQL database.

We also configured the Debezium connector to apply the Outbox Event Router Single Message Transformation (SMT), which provides several benefits:

  • It extracts the payload from the raw change data captured by Debezium. In our use case, we put the event message encoded in Protobuf as the payload into the TEP table. The payload can be directly published by Debezium to the Event Infrastructure without the need to add an extra conversion layer.
  • It can be easily configured to extract information with the given table structure.
  • It filters out DELETE operations on the TEP table, as deletion of TEP records is not relevant to downstream consumers.

TEP Publisher library

To integrate TEP when developing applications, a TEP library is provided and its API can be called within database transactions:

When publish is called, the API executes the following steps:

  1. Encode the given inputEvent Protobuf message.
  2. Within the database transaction, create a new record in the TEP table, with the encoded event data as payload.
  3. Within the same database transaction, delete the new record in the TEP table.

Note: The deletion of the TEP record is to prevent the TEP table from growing rapidly. Since the TEP records have been streamed, the Kafka broker should be responsible to store the event data (by configuring the topic retention, if needed), and these records would be not useful to keep around in the database. With the benefits of the Outbox Event Router, the deletions of the TEP record are not streamed.

Lessons Learned

With the TEP implemented and rolled out to product teams, we came across some issues that could be avoided or improved:

  1. For heartbeat configurations, we initially only defined heartbeat.interval.ms. We thought it was enough since it lets the Debezium connector commit offsets to Kafka periodically, so that the connector could send its latest retrieved LSN to the database and allow the database to reclaim its disk space. However, we ignored the fact that the AWS RDS instance contains multiple databases (e.g., rdsadmin) and WAL is shared by all databases. AWS RDS writes to its own system tables on a frequent basis, so the connector cannot capture the other databases’ changes and thus would not acknowledge WAL positions when the database the connector is attached to has little traffic. We had to create a debezium_heartbeat table, define heartbeat.action.query to insert heartbeat records into the table periodically.
  2. To monitor the health and operations of the Debezium connectors, we have been using Java Management Extensions (JMX) metrics provided by Kafka Connect and Debezium. We are also monitoring the AWS RDS Oldest Replication Slot lag in case WAL is largely behind and filling up the disk in PostgreSQL. However, we did not use the Strimzi metrics to monitor Strimzi Cluster Operator reconciliation. We once updated the configuration incorrectly of an existing connector, which resulted in Strimzi reconciliation failure. Since the connector with the original configuration was still running, we did not get the alert from Kafka Connect or Debezium metrics.
  3. Strimzi operators currently do not automatically restart connectors and tasks if they are in the FAILED states (Strimzi has a proposal for automated restarting). Sometimes a transient database connection issue could lead to a failed connector, and we have to make HTTP POST requests to the Kafka Connect to restart the connector. We are working on a Kubernetes operator (as mentioned in the Future Work section below) to help restart the connectors.

TEP Use Cases

Here we picked 2 use cases of TEP in Brex, to illustrate the benefits of TEP for improvements of data consistency and observability.

Communication Service TEP Usage

In Brex, we have a Communications Team that manages all messaging (SMS, email, push notifications) from Brex services to Brex customers. The diagram illustrates a simplified version of the communication architecture:

  1. Comms Service: To send notifications to the customers, the other service calls the Comms Service, which publishes the Notification Event to the Event Infrastructure, and create a Channel_Notification record for idempotency and status check of this notification.
  2. Notification Sender Service: The service consumes the Notification Event from the Event Infrastructure, and calls the External Providers (e.g. Twilio, Sendgrid) to send notifications.

Inconsistency could occur since Comms Service needs to publish a Notification Event while creating the Channel_Notification record. Before the TEP adoption, the team chose to publish the event before committing the database transaction to create the record. The reason not to publish the event after the database transaction was to avoid the situation where the Channel_Notification record is created but then fails to publish the event, and the presence of record blocks further retries due to notification idempotency checks.

However, publishing the service before the database transaction still leads to the inconsistency issue when the event gets published without the Channel_Notification record created. In this case, the result would be a Notification_Status_Update event but not a corresponding Channel_Notification record. This issue became severe once, when a long-running database migration was executed and locked the database rows temporarily, resulting in the database transactions failures creating the Channel_Notification records.

With TEP, the issue is resolved because event publishing is tied to the success/failure of the transaction. In case of database transaction failures, the Sending Notification can be retried without worrying about duplicating events/records.

Expense Service Audit Trail

In the Brex backend, an Expense Service is used to manage the money movements. To support a full audit trail for an expense, TEP is adopted so that every database transaction for expense mutations (create, update, delete) will trigger the Expense Update Event, which can then be consumed and stored by an Expense Audit Service. With the audit logs, the other services can easily call the Expense Audit Service to query and obtain the audit trail for a given expense.

The TEP adoption eases the data audit and observability here, which can be extended to other services in the future.

Future Work

While TEP has been implemented and gradually adopted by our Product and Data Platform teams, we still see room for improvements:

  • TEP database management: Currently to onboard TEP, the service owners have to write their migration scripts to create TEP table for their database. As the owner of the CDC platform and TEP, we determine the TEP schema, which is the same across all databases. However, since each team owns its migration scripts for the TEP table creation, if we need to update the TEP schema, we have to rely on the onboarding service owners to do so. With increasing adoption of TEP, we are working on automating the process to create TEP table, update TEP schema, manage TEP publication, role and user creations, and updates.
  • “One-Click setup” for TEP provisioning: Since we are using a single Debezium connector per PostgreSQL database, to onboard TEP for a database, the database owner has to write the KafkaConnector YAML file to attach the connector to the database. The KafkaConnectors configurations are almost the same, except for several fields related to the database information. To simplify this process, we are working on a Kubernetes operator to manage the provision workflows, so that with the given database name, we will retrieve the database information, create and manage the KafkaConnectors in an automated way for the product teams.
  • Data democratization: We have built a Change of Data Capture (CDC) platform for all the use cases (including the TEP use case) that require Kafka Connect and Kafka Connectors. This platform can be further extended for data discovery, governance and data lineage.
  • We will expand the audit logs for both Domain events and Change Events across all services.

If you want to help build financial software and services for every growing business, come join us at Brex!

Special thanks to Adnan Abdulhussein and Jun Zhao for co-authoring this blog post, Mairbek Khadikov for proposing TEP implementation, Nikunj Yadav for proof of concept implementation, and contributors including Taylor Whitfield and Emily Goodman.

--

--