Ensuring data consistency across services with the Transactional Outbox pattern

Mateus Moury
Incognia Tech Blog
Published in
9 min readFeb 17, 2020

Thanks to Rafael Acevedo for contributing to this blog post.

Introduction

A microservices architecture consists of a collection of loosely coupled services that communicate with one another for composing the application. Generally, there’s one database per service, and a common problem developers have to face is: how to maintain data consistency when there are business transactions that span multiple services?

As an example, think of an application used by a library composed of two services: one of them is used by the owner to register new books and delete old ones, and the other is used by library clients to rent the books. Every time there’s an update in the book management service, it has to be propagated to the book rental service so that new books can be rented and old ones cannot. The book rental service has its database that persists, for each book id, whether it is available or not.

The Library app composed of two microservices

There are a handful of different ways to solve this issue. Usually, the solution consists in breaking the distributed transaction in a series of local atomic transactions, where each local transaction publishes a message that will trigger the next one.

Here at Incognia, we decided to use the Transactional Outbox pattern to coordinate the set of local transactions necessary to maintain data consistency. As explained in detail below, by using Debezium and Kafka as the broker, we’ve implemented this pattern in a way that services are entirely agnostic to producing messages into the messaging system.

The Transactional Outbox Pattern

As said before, our goal is to divide a distributed transaction into multiple local transactions. Each service has to atomically update its database and publish a message that triggers the next local transaction.

In the Transactional Outbox pattern, one extra table has to be created: the outbox table. Every time an entity is created or modified in any table in the database, we’ll also persist the updated entity into the outbox table atomically. The outbox table is a generic table that works as an API for the messaging system. This means that some other application monitors the table and publishes new entries to the messaging broker. We’ll call this application the Change Data Capture (CDC) application.

The extra outbox table along with the domain table in the database

The image below is an example of which queries the Book Management Service would have to perform in the database when a new book is created. One could think that the second INSERT below could be implemented as a trigger for the book’s table. That would work perfectly for this example specifically. Still, we learned that in more complex table updates (that involve tables with relationships, for example), it gets hard to control how many times and when the trigger would be fired.

Query to atomically update application domain and outbox tables

The outbox table is supposed to be generic so that it can be used for any other tables in the database. For example, imagine there’s another table that stores authors of the books, and another application needs this information. In that case, the same outbox table should be used. For this to work, the outbox table should contain the following columns:

Event ID

The primary key of the table, which is a unique identifier of the message.

Event Type

This is the type of event that was persisted in the database and can be used by the consumers to decide on how to process it. For instance, an event of type book_added may be used in the Book Rental Service, but the author_removed event would not.

Aggregate ID

That’s the identifier of the entity itself. Consumers can use this information to easily keep only the last state of a given entity when more than one state is consumed from the broker. It’s also useful for cleaning up old messages from Kafka, as we’ll see in detail below.

Aggregate

This is the payload of the entity in its new state as JSON. It will be up to the CDC application to parse this JSON into another well-structured data format if needed. The reason it’s as generic as a JSON text is that we want to use the same table for all outgoing updates.

Kafka as the Message Broker

Before diving into how to publish messages in Kafka, let’s talk about how to use Kafka for our use case. Most of the systems we’ve seen use Kafka with time-based retention for topic messages. That’s because messages are processed in real-time once (and aggregated in some way to be used for data analysis) and possibly never consumed again.

We want to keep the latest state for each of our entities in the broker, possibly forever (imagine in our example that a book was never removed), and previous states are not useful (if a book name was corrected, it doesn’t matter which is the old incorrect name for the Book Rental Service).

For this, we use Kafka Log Compaction as the topic cleanup policy. The idea behind it is to selectively remove records where we have a more recent update for the same message key. Using this policy, Kafka only persists the latest published message for each key, and we can choose when to remove entities from it by producing null-valued messages for a key (these are the so-called tombstones).

A single Kafka topic may have lots of partitions. Partitions are a way to save topic messages across different machines with replication policies more efficiently and also allowing parallel consumption. Each partition simply stores a fraction of the topic messages.

Kafka topic partitions. Source: https://sookocheff.com/post/kafka/kafka-in-a-nutshell/#&gid=1&pid=4

When using the Log Compaction policy, Kafka separates each partition into two parts: the tail and the head. The tail consists of already cleaned up messages, where there’s only one message for each key. The head, which wasn’t cleaned up yet, may contain messages for the same key more than once. The cleanup process runs from time to time under some editable conditions to clean the partition and make it store only the last state for the entities.

Kafka cleanup process. Source: https://medium.com/swlh/introduction-to-topic-log-compaction-in-apache-kafka-3e4d4afd2262

For this reason, it is crucial to keep messages of the same entity (and thus the same aggregate_id) in the same Kafka partition when publishing them. Otherwise, it’s impossible for Kafka to keep only one message per key, causing unwanted behavior for our use case.

Now, imagine the library sold one of its books, and the owner sent a request to the Book Management Service for deleting the book from the database. The Book Rental Service also needs to delete the reference to the book in its database, but after this happens, there’s no reason to keep this information in Kafka. These are the so-called tombstones: they’ll be in Kafka for some while in order for consumers to use the information, and after that, they’ll disappear and also remove all the messages of the corresponding key. They’re simply null-valued messages.

Some useful topic configurations for log compaction are listed below:

  • min.cleanable.dirty.ratio: the cleanup process will only clean the partition if the dirty portion (head) corresponds to at least this ratio of the space used by the partition. If you choose to use 0, this configuration will be ignored.
  • delete.retention.ms: this is the minimum amount of time a tombstone will be in the topic before being erased. In our example of a book deletion, this configuration must guarantee the Book Rental Service had time to read the information before it's swiped.
  • There are also useful partition segments configuration that we chose not to explain here. We recommend the reader to study Kafka Internals for a deep understanding of how Log Compaction works.

Publishing Outbox Changes with Debezium

Now let’s talk about how to extract information from the database and publish it into Kafka! Debezium is a change data capture connector built on top of Kafka Connect, a framework that makes it easy to connect Kafka with external systems like databases. When creating a new Debezium connector, one can configure from which tables the updates should be read, which transformations should be applied to the messages, and to which topic the resulting payload should be produced to.

Debezium as the CDC application to publish database changes into Kafka

Since most of the time, the row of the database isn’t exactly what we want to produce to Kafka, Debezium already provides some known transforms for extracting the exact message we want to produce. In our case, besides using Debezium’s transforms, we had to implement some of our own. Below is an example of a Debezium task that can be deployed in Kafka Connect that reads changes in the books table and produces them to a Kafka topic.

Debezium deployment configuration

This connector reads changes from a Postgres database, only in the outbox_table from the books schema. It parses the Postgres write-ahead log into JSON using the specified plugin. It also uses 6 different message transformations before producing to Kafka:

  1. Unwrap: Debezium creates the message as a struct containing the entity before and after the change, and we use this transform to get only the state after the change since we’re only interested in the last state.
  2. ValueToKey: Kafka messages are composed of a key and a value, and this transformation allows us to set the aggregate_id as the key of the message. That’s important because the same entities should be mapped to Kafka messages with the same key so that Kafka keeps only the last state of each key.
  3. SetDebeziumRecordPartition: as aforementioned, we need messages of the same entity to be produced in the same partition. This Incognia transform guarantees that messages with the same aggregate_id will be persisted to the same partitions by hashing the aggregate_id and choosing a corresponding partition. Otherwise, Debezium would use its own partition selection logic.
  4. SetTombstoneRecord: database rows can’t be easily mapped into null-valued messages. This Incognia transform examines if the value of the aggregate column is null, and if that’s the case, it then generates a null-valued Kafka message for the entity specified by the aggregate_id.
  5. SetKeySchema/SetValueSchema: parses the message into Avro with specified schemas before producing to Kafka.

After all of this, we’re done producing our message to Kafka! Now, some important caveats regarding Debezium. Debezium guarantees to produce messages in the same order they are persisted, at least once. This means repeated messages can be produced into Kafka, and your applications must be idempotent in case this happens.

Also, as Debezium is also a Kafka producer, there are some producer configurations that may be useful to look for when configuring the task:

  • producer.acks: the number of Kafka instances that have to acknowledge the message was received. The bigger this number, possibly the slower will be the producing process. If the production latency is higher than the database’s replication timeout (like PostgreSQL’s wal_sender_timeout), it can cause the database to terminate the replication connection, causing Debezium’s Kafka connect task to be terminated.
  • producer.max.request.size: the maximum request size for the Kafka producer. By default, it is set to 1MB. In our case, some messages have a larger payload, so we needed to tune this to be a bit higher.

For a more detailed explanation about how to deploy a Debezium task, check out Debezium’s tutorial. Our implementation of custom transforms is in this GitHub repository.

Wrapping it up

It’s possible to ensure data consistency across services in a microservices architecture by using the Transactional Outbox Pattern, abstracting the logic of producing messages from each service.

Here at Incognia, we chose to use Debezium as the Change Data Capture application, and Kafka as the message broker. Debezium is a Kafka Connect open-source component to read logs from databases, transform them and produce them into Kafka.

Debezium and Kafka proved to be highly resilient and performatic for all of our use cases so far, but once in production, there were some useful configurations to tune that we mentioned in the text.

Are you interested?

If you are interested in building context-aware products using a powerful location technology that genuinely cares about user’s privacy, then check out our opportunities.

--

--