Enhancing Inter-Service Communication: The Transactional Outbox Pattern Solution — Part 2

Gerardo Parra
Fever Engineering
Published in
13 min readMar 19, 2024

--

Photo by Luana Norões on Unsplash

The previous article in this series introduced the Transactional Outbox pattern, how we implemented it at Fever, and the scalability challenges we faced. Get ready, because in this second part, we will revisit these issues and explain how we solved them by introducing a Change Data Capture to the game field.

What’s a Change Data Capture?

Before we discuss how we incorporated a Change Data Capture (CDC) tool, Debezium, it’s crucial to take a step back and understand the concept of CDC.

(To learn about CDC and its practical implementation, refer to the articles on implementing CDC in practice: Miguel Gamallo, “Part 1” and Miguel Gamallo, “Part 2”.)

Why use a Change Data Capture?

The two primary scalability problems we faced when introducing Transactional Outbox Patterns involved constant polling of the database and the inability to parallelize event processing in the outbox table. Introducing Debezium directly addresses both challenges. The following figure illustrates the proposed new implementation:

Figure 1: Transactional Outbox design using CDC

In the Figure from left to right; the new elements include the Debezium Source Connector (a Kafka Connector responsible for capturing new rows added to the Outbox table), the Kafka topic that contains the captured events, and finally, an implementation of the Message Relay as a Kafka Consumer.

Using a Kafka consumer to send the records to another message broker may look strange. The reason for this is that the solution needs to take into account the existing Fever system. Historically, we have used RabbitMQ as a single message broker, so the Transactional Outbox needs to support that part of the solution until we evolve.

Solving the scalability issues

A straightforward solution to address the scalability issues mentioned above is integrating Debezium and Kafka consumers into the design.

Database performance degradation will hurt the Message Relay latency:

As explained earlier, a weak point was the Message Relay’s dependence on the database for event delivery. Debezium addresses this by reading changes from Postgresql’s Write Ahead Log (WAL), which is significantly more efficient than constant queries against the table. However, using Debezium to retrieve events is not without its challenges.

As explained in the section ‘Adding new derived data systems’ of this article, it’s essential to ensure that the Incremental snapshot mode is configured when incorporating the Outbox table into the source connector. This prevents the connector from automatically attempting to start the snapshot using the option: ‘snapshot.mode = never’. Consequently, the connector will capture changes from that moment onwards, incrementally processing them. This incremental approach means that the table records in the WAL will be distributed in manageable chunks; once a chunk is fully consumed, the corresponding disk space can be freed. Monitoring disk usage in the Postgres instance is crucial, particularly when the connector is handling multiple tables with varying sizes and loads.

Even after configuring the incremental snapshot mode, some problems will need to be dealt with:

  • Debezium’s Source Connector is a well-designed software component; however, it still linearly processes a list of changes made over the database. If a large volume of changes arrives at the Outbox table in a short time, latency can increase even if throughput remains unaffected. Addressing this issue requires a dual-sided approach: optimizing the publication side to reduce the load as much as possible, and refining the Kafka Connector’s configuration parameters from an infrastructure perspective to enhance the throughput. (These techniques will not be discussed in this article.)
  • The dependency on the DB is still there. The DB needs much fewer resources to attend Debezium than to serve a query, but two metrics will still have a direct impact on Debezium’s performance:
    - The commit time of transactions is crucial: by default, Postgres will respond to the client once the WAL record of the transaction is flushed to disk, ensuring changes are either committed or rolled back. However, this consistency model comes with a trade-off: clients are blocked, tying up connections until the commit is completed, which can only happen once preceding transactions are committed. In high-volume change scenarios, this leads to increased commit wait times, causing Debezium’s latency to rise while limiting Postgres’ available connections for new transactions.
    - WAL disk usage is critical: if Postgres can handle the volume in high-volume change scenarios, more WAL records will be flushed to disk. Without adjusted throughput in Debezium’s source connector, WAL records will accumulate until Postgres exhausts the available space. At that point, the replication slot will be removed, resulting in lost changes for Debezium. Creating a new replication slot to catch up can take hours to days.

Outbox table congestion problems affect individual message delivery time.

The problem of parallelizing events is resolved straightforwardly. Since Debezium retrieves events from the Outbox table and loads them into a Kafka topic, a partitioning strategy is supported. However, it’s essential to note a potential issue with this strategy. If the similarity between message keys used for partitioning and the frequency of each event leads to hotspots (a partition with high load), it could increase event delivery latency, as each partition has its consumer.

To gain a better understanding of the Kafka Producer partitioning strategy, you can experiment with different key sets using this code: poc-kafka-partition-assignment

In any case, simply applying partitioning to the topic will significantly decrease delivery time. An essential consideration here is determining the key for partitioning. Within the topic, records do not follow a global order; instead, each partition maintains its local order. By distributing records based on a key, events with dependencies on order must share the key to ensure Kafka assigns them to the same partition.

Fever’s implementation, deploying in 3 Phases

We addressed several challenges in three phases to transition from the initial implementation to the new one based on Debezium. These challenges include:

  1. Preventing double publication for events from the former Message Relay and the new ones based on Kafka consumers.
  2. Redesigning delivery retries with the Kafka consumers.
  3. Implementing a mechanism to alleviate pressure on hotspot partitions.

Phase 1. Introduce the Kafka consumers avoiding double publication with the existing Message Relay

The first phase has one clear objective: introducing the Message Relay with Kafka consumers and progressively migrating event processing to them.

It is essential to migrate event processing in small steps to verify the proper functioning of the Consumers progressively without compromising the entire operation. Additionally, in this phase, two challenges must be addressed.

The following figure illustrates the design for this phase:

Figure 2: Phase 1 for the Transactional Outbox over CDC

A temporal configuration variable is introduced to control which events are processed by the Kafka consumers and which ones are still handled by the former Message Relay worker. This variable will consist of a list of event Fully Qualified Names (FQNs). If an event is included in this list, the Kafka consumer will attempt to deliver it to RabbitMQ; otherwise, it will be processed by the old worker.

The topic

The first aspect to comprehend is the design of the Transactional Outbox topic. As explained in the first article, the events within the Outbox have a property known as the Fully Qualified Name (FQN), which serves as a category identifier for event types. Consequently, this property is ideally suited for use as a message key, facilitating the grouping of messages into partitions.

When partitioning a topic, it’s essential to note that the number of partitions can be increased but not decreased without risking information loss. Hence, the initial number of partitions is set to 20 for approximately 100 different FQNs. With this configuration, distribution is even, with an average of 5 event FQNs per partition.

The Message Relay as Kafka Consumer

This Consumer comprises two distinct parts: understanding how to process a Debezium record from the topic and executing the logic of the Message Relay. Fever has already addressed the first part to leverage Debezium in other processes. The outcome is an internal component that utilizes a Kafka Consumer to extract an object containing information from the record. Although a Debezium record contains a wealth of information, for constructing the object with event information, only the “payload” is relevant:


"payload": {
"before": null,
"after": {
"event_id": "9dbeead1-ef48-4599-8cb3-56f218a88f60",
"event_fqn": "event.service_1.core.entity.entity_created",
"event_payload": "{\"entity_id\": 5, \"extra_info\": null}",
"created_at": "2024-02-05T09:35:15.192812Z",
"delivered_at": null,
"delivery_errors": 0,
"delivery_paused_at": null
},
"source": {
"version": "2.2.1.Final",
"connector": "postgresql",
"name": "connector_name",
"ts_ms": 1707238071587,
"snapshot": "false",
"db": "fever",
"sequence": "[\"1119370896\",\"1119371032\"]",
"schema": "public",
"table": "db_table_name",
"txId": 12322,
"lsn": 1119371032,
"xmin": null
},
"op": "c",
"ts_ms": 1707238072056,
"transaction": null
}

The payload of the Debezium record contains several fields:

  • The before and after fields hold the DB row information before and after the change, respectively.
  • The source field indicates details such as the connector’s name and whether the change is from a snapshot, or the DB table.
  • The op field corresponds to the operation performed on the captured table. In this case, the value ‘c’ indicates a creation operation, so there is no before information.

The object returned by this Consumer will contain all the Outbox Record’s information as a Python object.

At this point, it’s crucial to recognize the main flaw in this design. Two distinct parts are involved: the Kafka Consumer that processes the Debezium record and the Message Relay. This distinction is significant because if the Message Relay fails to deliver the event to RabbitMQ, the Consumer has already acknowledged the consumption. Thus, the offset is already committed to the next record in the topic. This design decision prioritizes performance; by operating in this manner, the Consumer can process records in batches without having to commit offsets individually.

This underscores the necessity for Phase 2, which will be discussed in the next section.

The logic of the Message Relay closely resembles that of the initial implementation. The primary difference is that, in Phase 1, the old Message Relay continues to operate for events not listed in the configuration variable mentioned at the beginning of this section. This highlights the critical nature of migrating an event to the Kafka workflow.

At the Kubernetes layer, two separate PODs operate both the old and the Kafka Message Relays. When the configuration variable is updated with the new event, a desynchronization could occur if the Kafka Consumer begins processing the event while the old worker has not yet updated the configuration variable. In such cases, both workers might publish the event. To prevent this scenario, we need to sequence very carefully the deployment steps:

  1. Remove the old worker’s POD.
  2. Update the configuration variable.
  3. Wait until the Kafka Consumer is synchronized with the updated configuration variable.
  4. Reintroduce the old worker to resume its operations.

This sequence is represented in the figures below:

The deployment can cause a temporal desynchronization, leading to both workers processing the event.

Figure 3.1: Old worker de-synchronized during a deployment

To avoid this, the old worker is manually removed meanwhile, the Kafka worker synchronizes with the configuration variable.

Figure 3.2: Old worker manually removed before deployment to avoid de-synchronization

Once the Kafka consumer is synchronized, it’s safe to reactivate the old worker because it will also be synchronized with the updated configuration variable.

Figure 3.3: Old worker synchronized after deployment

Phase 2. Redesign delivery retries with the Kafka consumers.

As mentioned earlier, the design of the Kafka consumer presents the issue of acknowledging record reception to Kafka before publication. Phase 2 aims to address this by introducing a complementary worker to handle these records.

Figure 4: Phase 2 for the Transactional Outbox over CDC

The complementary worker will be relatively simple, functioning much like the old one. The key difference lies in the criteria used to fetch the Outbox records. This new worker will only retrieve records that have been in the table for more than an arbitrary amount of time without being delivered. Under normal circumstances, Kafka workers will process a record first, and the complementary worker will handle it if it has yet to be delivered. There’s no fixed duration for how long a record must remain in the table before being considered by this worker; the value must be adjusted based on the latency of the Kafka consumer.

Figure 5: Complementary worker collects and delivers events missed by Kafka Worker

With the introduction of this new worker, there’s no need to maintain either the old worker or the configuration variable to mark which events must be processed by the Kafka consumers. All events should be processed first by a Kafka consumer and later, if necessary, by this complementary worker.

It is important to note that the risk of having a Kafka Consumer and a Complementary worker handling the same event simultaneously is possible. That would result in a double publication scenario.

To avoid it, the implementation of both components should set the delivered_at field as soon as possible, for instance, using PostgreSQL, an Update like this one solves the problem:

UPDATE <outbox_table_name>
SET delivered_at = NOW
WHERE delivered_at is NULL AND …

Note that PostgreSQL will acquire an exclusive row-level lock for the update. If other persistence is used, another solution should be found.

The Update statements allow the Consumer and Worker to read and write at the same time, so it is guaranteed that the second to execute will not be able to update the row again. If the Update statement does not write a row, then the event is not published to RabbitMQ, avoiding the double publication scenario.

Phase 3. Implement a mechanism to relieve pressure on hotspot partitions.

The two preceding phases are sufficient to achieve the complete functionality. However, when addressing the solution to the second scalability problem (Outbox table congestion problems affect individual message delivery time), a negative side effect of the partition strategy was also discussed: the potential for events with high traffic volumes to be assigned to the same partition. Phase 3 aims to provide a mechanism to mitigate these situations, although not entirely solved.

In the following Figure, a scenario is presented to illustrate this situation:

Figure 6: Scenario representing an overloaded partition

In this scenario, one partition has received two different events: A and B. Event A holds far more criticality from a business perspective compared to B, yet B has a higher volume. This poses a challenge because the consumer must process the 6 B events before reaching the second A event, significantly delaying event A’s processing and adding substantial latency.

To mitigate this situation, Phase 3 introduces a new configuration variable.

Figure 7: Phase 3 for the Transactional Outbox over CDC

If the event is listed in the variable, the Kafka consumer will consume it but refrain from processing it, significantly increasing latency. To process this event, one option is to wait for the complementary worker to assume the event’s processing, but this may take longer than desired. Therefore, a script will be developed for manual activation. This script will fetch and process all records from events marked in the configuration variable.

This process requires instrumentation with Observability tools. Because it’s manual, monitoring and alerting must be integrated to trigger responses. A key metric to monitor is the offset lag of a consumer group. Some partitions are likely overloaded if a topic experiences a significant increase in offset lag. Additionally, logs can be implemented for the Kafka Consumer to provide insight into which records are being consumed.

In any case, as with all manual processes, additional effort should be invested in automating responses to such scenarios.

Wrapping Up: Key Takeaway and Some Challenges For the Future

At this point, we’ve thoroughly explained Fever’s implementation of the Transactional Outbox and how we’ve evolved it to address various challenges. As mentioned in previous articles, the Transactional Outbox is a vital pattern for our system, and thanks to the introduction of CDC, scalability challenges have been solved for almost all scenarios. This solution enables reliable communication between our services with acceptable performance. However, there is still room for improvement in this implementation.

In the short term, our first focus will be on refining the Transactional Outbox’s topic partitioning. As highlighted in the second scalability issue, congestion problems in the Outbox table affect individual message delivery times. With the current solution, we lack control over partition routing and how Consumers are assigned to those partitions. This results in critical messages sharing Consumers with less important messages, leading to increased latency. Enhancing our capabilities to decide which partition-specific messages are assigned and assigning a Consumer to that partition. In contrast, other partitions are managed by auto-scalable Consumers, so we can ensure that critical messages, such as purchases, are constantly consumed with minimal latency. In contrast, the consumption of other messages is dynamically adjusted based on factors such as total message volume.

Additionally, we need to use our observability over the Transactional Outbox to detect bottlenecks in the flow and fine-tune the Kafka Connectors’ configuration to increase throughput and reduce latency.

References

--

--