Distributed Outbox Event Publishing Pattern with Kafka and Sidecars

Alexander Morley
eMed Technology Blog
10 min readApr 6, 2023

Richard Noble & Alexander Morley

Back in 2019, we published a take on the outbox pattern for transactional events. This pattern solves a very real and important problem: keeping our event streams fully consistent with our primary (relational) databases. At Babylon, most microservices use postgres as their primary data store — but they also need to send events that correspond with changes in their database to our event-streaming platform. “Audit events” are used to track all the changes in the system and “domain events” are used to build materialized views of our data in BigQuery (for analytics) and Elasticsearch (for near-real time search).

The naive approach of simply “double dispatching” events after — or during — a database transaction is not robust. Any failures that occur while committing the transaction can result in the data in the event stream drifting from what is in the primary database. This is not OK!

A diagram depecting the Dual Writes pattern. Pros — simple, easy to implement. Cons — does not ensure consistency of data between stores.

Instead, we can use the outbox pattern. We ensure atomicity by first writing the event to a separate table in the primary database (dubbed the outbox table) in the same database transaction serving the request. From there — we can guarantee the event is eventually published to kafka using an asynchronous helper process that polls the table. The simplicity in this pattern is that it relies on a feature that we already use — database transactions. This is in contrast to other methods of automatically writing to distributed systems — such as 2PC or XA — which either require implementing complex protocols or exclusively using systems that already support them.

For more details on why we chose this pattern in the first place, check out these resources: “Transactional publishing of events in healthcare” and “Pattern: Transactional outbox”.

Diagram depicting the outbox pattern with a single replica

However, this approach comes at a cost. First, there’s a delay — and generally we want to get the events on the log as quickly as possible. Secondly, we have to implement this asynchronous outbox publisher for every application container — which, even with shared libraries — requires a new implementation for every language & framework we use. We might be able to live with these problems on their own — but they both get worse when we consider the fact that our microservices don’t usually run with a single replica — but rather with two or more pods.

Diagram depicting the outbox pattern with two replicas

Yikes! Now, we have two processes reading from the outbox table and writing to our event stream. That means we’re going to get out of order and duplicate messages! To guarantee order persistence in general requires one of 2 things: leader election or passing database connection details to a single system outside of the service pods. Allowing any single entity in our stack to have access to all of our databases is a non-starter for us due to the blast-radius if a breach were to occur.

So, we use a leader election approach — and we don’t have to do this totally from scratch. For example, in our Java Spring microservices, we rely on Quartz to do the heavy lifting. But, it’s still a lot of overhead in terms of complexity. And, this approach throttles the process of publishing to the event stream to a single process — which can end up being a bottleneck under a write-heavy load. So, while we now have a system that will maintain consistency under all conditions, we’ve exacerbated our original problems with outbox — delays in publishing events & having to re-implement things in multiple applications — by using this pattern with > 1 pod.

This has been a low key irritation in the back of our minds. There must be a way to have it all. Yes, we want a world where we have our cake and eat it too, see? Once we’ve had our cake and we’re floating on the erratically swirling currents of the sugar high — we’d still have cake. There should be more cake!

Fortunately, we run Kubernetes all over the place — and in the interim — Babylon introduced the ability for us to deploy sidecars. This creates space for us to write an outbox consumer that will run each pod of the sending application. Yay! Distribution! Now we only have to write the outbox logic once. Hurray!

Diagram depicting the outbox sidecar pattern with one replica

But…

We haven’t solved the other problem. In the use case where we have multiple pods, we still have a bunch of publishers with a single database table in the middle (see previous article) — and all of the publishers would be competing to send the same messages. We need a mechanism to orchestrate the flow that doesn’t involve restricting only a single sidecar to read from the table and send messages.

If we used a leader election mechanism, we’d end up with the same shape as we had above — and gain no benefit from the extra sidecar workers. Instead, we can take advantage of the fact that we don’t need to guarantee the order of all the events with respect to all the others. They just need to be ordered within each partition (which is OK, because our partitioning strategy makes sure that events related to the same member always end up on the same partition — also see this talk). If we can ensure that each sidecar exclusively writes to a given set of partitions, we can keep our ordering guarantees and parallelize publishing events.

Next question — how do we ensure that each partition is only ever assigned a single publisher? Kafka has a mechanism for this — the consumer group. A Kafka consumer group takes a group of consumers — identified by a specific ID — and allocates partitions fairly to each member of the group. It also notifies the members of the group if a membership changes and keeps these groups balanced. Usually these groups are used for distributing consumption of events — but we can use them for publishing events too.

Diagram depicting the outbox sidecar pattern with two replicas

What do we need for our shiny new Sidecar Outbox client:

  • A Kafka Consumer in a group that we’ll use to manage which messages this publisher needs to publish.
  • A Kafka Publisher — you know — to publish the messages.
  • A database connection and event table — this is our outbox. We’ve found the best way is to have the application serialize the events into bytes before writing them to the outbox table. We also let the application handle partitioning by writing the desired partition of each event to a separate column. We’ll use this to only pick up the messages that we need to send for the given sidecar.

See appendix for a little extra detail on how we handle serialization and partitioning between the application and the sidecar.

The process, in brief is as follows:

Each sidecar client has a paused consumer and a poll interval set. On each poll, obtain the currently assigned partitions and request all the messages for those partitions. Send those messages and mark them sent.

Wash, rinse, and repeat.

The consumer is “paused” because we aren’t using it to consume, even though its called a consumer. Confusing? Maybe.

Below is some simplified code that illustrates the commands.

/* # partitioned_producer.rs */
impl KafkaPartitionedEventProducer {
pub fn new(brokers: &str, group_id: &str, topic: &str) -> Self {
let consumer: LoggingConsumer = ClientConfig::new()/*…*/;
consumer
.subscribe(&[topic])
.expect("Can't subscribe to specified topics");
consumer.pause(&consumer.assignment().unwrap()).unwrap();
KafkaPartitionedEventProducer { /*…*/ }
}
pub fn assigned_partitions(&self) -> Vec<String> { /*…*/ }
pub async fn produce(&self, events: &Vec<Event>) -> Vec<String> { /*…*/ }
}

/* # outbox_table.rs */
impl OutboxTable {
pub async fn claim_records_for_partitions( &self, partitions: Vec<String>, sidecar_id: Uuid, ) -> Result<Vec<Event>, Error> {
// "UPDATE {} SET claimed_by = '{}' WHERE target_partition in ({}) RETURNING key, value, target_partition, claimed_by;"
}
pub async fn purge_records(&self, record_keys: Vec<String>, sidecar_id: Uuid) {
// "DELETE FROM {} WHERE key in ({}) AND claimed_by = '{}'"
}
}

/* # main.rs */
async fn watch<'a>(
producer: KafkaPartitionedEventProducer,
datastore: OutboxTable,
poll_interval: u64,
) -> u64 {
let sidecar_id = Uuid::new_v4();
let mut assigned_partitions: Vec<String>;
loop {
assigned_partitions = producer.assigned_partitions();
if assigned_partitions.iter().count() == 0 {
info!("Not assigned any partitions. Not polling DB.");
continue;
}
info!(
"Assigned partitions: {:#?}. Polling DB for new records…",
assigned_partitions
);
let claimed_events = &datastore
.claim_records_for_partitions(assigned_partitions, sidecar_id)
.await;

let produced_event_keys = producer.produce(claimed_events.as_ref().unwrap()).await;
datastore.purge_records(produced_event_keys, sidecar_id)
.await;
}
}

It’s that simple. If the membership changes, then we’ll have a fresh batch of partitions — which allows the system to self-balance based on Kafka. We can happily configure the poll interval to change our ‘batch interval’ — it’d be at 100 ms by default.

“But, wait!”, you say, those are still batches! And 100 ms — that’s like, forever! To this we say, we like the way you think.

For us, 100 ms is usually okay. But — if you are more extreme than we are — then you still have options. For example, if you’re lucky enough to be using Postgres — and a reasonably recent version — then you could use the listen/notify to turn your event table into a more stream-like entity.

On each insert, simply notify the key of the table and the partition logic in the message. The sidecar can then test if it ‘owns’ that partition — mark it as handled, get the message, publish, and commit. If the sidecar gets a rebalance, it completes in flight messages and fetches any unhandled messages in its new partition.

There is a cost here — a lot of updates and queries for each message. There are plenty of optimizations at this point depending on your setup. You could commit in batches — or if your messages are small — just add them to notify.

One way or another — we’ve moved from a bottleneck to something more distributed. From something pretty custom — to one standard image that can just be deployed. Furthermore, due to the nature of the sidecar, the actual application doesn’t need to maintain a connection to Kafka or worry about how this data is sent. They just need to write the message to their event table with a marker for the partition — and the rest is taken care of for them.

Now, it’s time for cake.

Appendix 1 — Serialization & Partitioning

Why do we do serialization in the application?

Keeps sidecar small — it doesn’t need to have all the schemas required to do serialization — or have the logic to fetch them from somewhere. It also prevents having to create complex (relational) schemas for the outbox table (instead we can always use the same schema for all of them).

Why do we do partitioning in the application?

Actually, we lied a bit. We don’t do all the partitioning here. What we actually do is ask the application to put the bytes — which should be passed to a hash function to compute the partition — into a column with the event. Why not do it all in the event? Because then the event has to de-serialize the event key. Why not do it all in the application? Because the application doesn’t actually know how many partitions the topic has (although that’s relatively static) — and we don’t want to introduce an extra line of coordination between the two containers (currently communication is entirely one-way and via the outbox table in the database).

Appendix 2 — Cost

Isn’t running an extra container in all of your pods expensive?

Only if it consumes lots of resources. Because we’ve written the sidecar in rust — it’s pretty efficient in terms of CPU and memory. We’ve also made the binary as small as possible by using the following cargo options:

[profile.release]
strip = "symbols"
opt-level = "z"
lto = true

What about the base image?

We don’t use one! Well, technically we use the scratch base image provided by Docker — but the only layer in that image is our binary. In order to do this, we had to create a static binary, i.e. without linking out to any dynamic libraries. We avoided having to link to OpenSSL by using rusttls for all our ssl needs — but couldn’t avoid having to build librdkafka (there are native rust kafka libraries but they aren’t as well supported). Luckily we could take advantage of an existing project rust-musl-builder and docker multi-stage builds to make this less painful.

### BUILDER IMAGE ###
# N.B. we do *not* use any nightly features in rust language
# just to allow stripping symbols from the rust binary (an unstable cargo feature)
FROM ekidd/rust-musl-builder:nightly-2021–02–13 as builder

# RUN sudo apt-get update && sudo apt-get install -y libclang-3.9 clang-3.9 llvm-3.9 \
# curl musl-dev musl-tools make python g++ && sudo chown -R rust:rust /home/rust
WORKDIR /home/build/
COPY Cargo.toml Cargo.toml
# Re-build with latest source
COPY src src
RUN cargo build - release -p outbox-sidecar

### RELASE IMAGE ###
FROM scratch
COPY - from=builder /home/build/target/x86_64-unknown-linux-musl/release/outbox-sidecar /outbox-sidecar
CMD ["/outbox-sidecar"]

With this we end up with a 4.5 Mb image which is plenty small enough for us for now!

--

--