Exactly-once Support in Apache Kafka

On Thursday we released a new version of Apache Kafka that dramatically strengthens the semantic guarantees it provides.

This release came at the tail end of several years of thinking through how to do reliable stream processing in a way that is fast, practical, and correct. The implementation effort itself was on the order of about a year, including an extended period in which about a hundred pages of detailed design documents were discussed and critiqued in the Kafka community, extensive performance tests were performed, and thousands of lines of distributed torture tests specifically targeting this functionality were added.

The reaction to the release was mostly along the lines of “wow, that’s awesome.” Unfortunately, I made that classic mistake: I read the comments. These were a mixture of excitement with some very confused people claiming we were lying charlatans.

Here is a sample of reactions:

“It cannot be exactly-once delivery…it is simply impossible due to a pretty simple mathematical theorem. It also raises the doubts that maybe the author is confused herself, making the reader mistrust the whole thing.”

“Entertaining article in how it goes against mathematically proven facts. Unless you change the assumptions, which are not carefully specified in this article, it cannot ever work in all cases, which is what a system is all about.”

I believe these quotes are incorrect. If you’re one of the people who think this, I’d ask you to take an actual look at what we actually know to be possible and impossible, and what has been built in Kafka, and hopefully come to a more informed opinion.

So let’s address this in two parts. First, is exactly-once a theoretical impossibility? Second, how does Kafka support it.

Is Exactly-Once Impossible?

There is this claim floating around, and everyone seems quite sure it is true without knowing exactly why, that Exactly Once Delivery/Semantics is mathematically impossible. Yet despite this being apparently common knowledge, you rarely see people linking to some kind of proof of this or even a precise definition of what is meant by exactly-once. They link to other things such as the FLP result or the Two Generals problem as evidence, but nothing about exactly once. In distributed systems you can’t talk about something being possible or impossible without describing precisely what the thing is, as well as describing a setting that controls what is possible (asynchronous, semi-synchronous, etc), and a fault-model that describes what bad things can happen.

So is there a way we could define formally define a property like what we want to achieve?

Yes, it turns out that there is just such a property. It is called “Atomic Broadcast” or “Total Order Broadcast”. Here is the definition from one of the more popular distributed systems text books:

Reliable and Secure Distributed Programming

Go ahead and read it. To my mind this is what people mean by exactly-once delivery in the context of pub/sub messaging: namely that you can publish messages and they will be delivered one time exactly by one or more receiving application.

So is it possible to solve Atomic Broadcast?

The short answer is yes, in addition to the book I took the picture from, you can read a whole taxonomy of approaches comparing dozens of algorithms.

But short of reading a distributed systems book how could you convince yourself that this is true?

Well it turns out that Atomic Broadcast is equivalent to consensus so perhaps we can reduce our problem to trying to understand whether consensus is possible. This is helpful since consensus is perhaps the most studied problem in distributed systems.

So is consensus possible? Likely you have a sense that it is, since this is the problem attacked by well-known algorithms such as Paxos and Raft, and widely relied on in modern distributed systems practice. But if you want a theoretical result you need to be concrete about the setting and failure modes you’re talking about. For example several people in comments cited the “FLP” paper which is titled “The Impossibility of Consensus with One Faulty Process”. That doesn’t sounds good! Then again you might just as easily run into a paper claiming in its first sentence that failure detectors “can be used to solve Consensus in asynchronous systems with crash failures.” What to make of this? Well this is where the detail really matter in theoretical distributed systems claims: you have to be concrete about the setting and fault-model. The FLP result is proving that consensus isn’t possible in a very limited setting. Once you allow even simple things like local timers or randomization it becomes possible. You’ll notice consensus algorithms depend on these things to implement a kind of noisy but eventually correct failure detection such as “a process that doesn’t heartbeat for some time is dead”. These are the settings people refer to when they say such-and-such an algorithm “solves consensus”.

(In addition to FLP a number of people also linked to the Two Generals problem as the “mathematical theorem” being violated. I’m actually not sure why since, although I see the analogy for a traditional messaging system, reading and processing data in Kafka is not very similar to that problem.)

This is a deep topic so if you’re interested I can’t recommend enough the chapter on Martin Kleppmann’s wonderful book as a first step in diving deeper. For the truly obsessed, the references there could easily keep you busy for a month.

So how does this play out in practice? Well, as a practical matter, consensus is the mainstay of modern distributed systems development. If you’ve used pretty much any service in AWS or anything built on top of a service relying on AWS you are relying on systems built with consensus. This is true for many if not most of the systems being built now. Kafka is one of these, and its central abstraction is a distributed consistent log, virtually the purest analog to multi-round consensus you could imagine. So if you don’t believe that consensus is possible, then you also don’t believe Kafka is possible, in which case you needn’t worry too much about the possibility of exactly-once support from Kafka!

Okay, so how can we build an app that gets exactly once delivery in Kafka?

You may recall that Kafka has a log that looks like this:

A Log in Apache Kafka

Note that it is a strongly ordered sequence of records, each of which is assigned a sequential numeric offset that identifies that records position in the log.

The “producer” appends records to this log, and zero or more consumer apps read messages from a given offset that they control.

Let’s imagine an application that looks like this:

The sender wants to publish messages and the consumer wants to read them and put them (or some quantity derived from them) in a database. How can we do that and get the right answer?

You can see the two categories of problems that could arise:

  1. The first problem arises if the producer app writes to the log but fails to get the acknowledgement back over the network. This would put this producer in a bind: it could be that the write had actually succeed or it could be that it never got to Kafka at all. We don’t know! If we retry and it turns out the write had succeeded we could then have a duplicate; if we don’t retry and it turns out the write hadn’t succeed we’ll lose that write. This is effectively the same dilemma we’d have inserting into a database table with no primary key or an auto-incremented primary key.
  2. The second problem comes on the consumer side. The consumer application could read some messages from the log and write the results to its database, but fail before updating its offset marking its position. When that consumer was restarted (potentially automatically on a different machine using Kafka group management mechanism) that could result in duplicates. If the application updates its stored offset first and then updates the database, the failure could instead lead to missing that update upon restart.

Let’s talk about both problems. The first problem is solved by the idempotence support we announced in the post. This allows the producer client to always retry until it is successful without the possibility of duplicates (Kafka will transparently detect them and ignore them).

We didn’t talk about the second problem in much depth, but believe it or not, that wasn’t because we hadn’t thought of it! We didn’t dive into it in more depth because it was already a long blog post for people who know Kafka so we stuck with a short summary description.

Here is a more in depth discussion:

To ensure exactly once processing the consumer needs to ensure that the derived state it creates and the offsets pointing upstream stay in sync. A key fact here is that the consumer has control of its offset in the log and can store it wherever it wants. There are two common approaches for using this to get exactly once semantics on top of Kafka:

  1. Store the offsets in the same DB as the derived state and update both in a transaction. When restarting, read your current offset from the DB and start reading from there.
  2. Write both state updates and offsets together in a way that is idempotent. For example if your derived state was a key and counter tracking the number of occurrences, store the offset along with the count and ignore any update with an offset <= the current stored value.

Okay, but you might object that “that is hard!” I don’t actually think it is all that hard. I mean transactions aren’t trivial no matter what, but you will have transactionality problems, regardless, if you update multiple tables. Adding the offset table and including that in the update isn’t rocket science.

Another objection I’ve heard to this is that it isn’t really “exactly once” but actually “effectively once”. I don’t disagree that that phase is better (though less commonly understood) but I’d point out that we’re still debating the definitions of undefined terms! If we want a well-defined property defined around delivery I actually think Atomic Broadcast is a pretty good definition (though a terrible name — “atomic”?). As long as we’re speaking informally I think it’s fine to say “exactly once” since people have an intuitive idea of what that means (I’m guessing if we’d announced support for Atomic Broadcast the confusion would have been more not less). I think the bigger critique is that the real guarantee people want is neither “exactly” nor “effectively” nor does it have anything to do with “once” or “delivery”; the real guarantee people want is the end-to-end correct processing of messages in the presence of failure without having to think hard about the integration with their app.

After all, the solution I described isn’t all that complicated, but you still have to think about the semantics of the application. We tried to address this head on in the original blog in the section entitled “Is This Magical Pixie Dust I Can Sprinkle on My Application” (the answer was “no”).

So can we make this easier? We think we can. This is where the second part of the feature set comes in: transactions.

The example I gave above is actually mixing together two distinct concerns: the processing of data and the integration of the result with some storage system. Since these are intertwined the developer has to reason about them both together in a way that is hard to untangle.

The idea for improving this is to factor the application into two distinct parts: a “stream processing” portion that transforms one or more input streams (potentially aggregating across records or joining on side data) and a connector that transmits this data to a data store (these can run in the same app or process, but they are logically distinct).

The connector requires reasoning about the transactional or idempotent delivery of data to the particular data system from Kafka. This requires thinking and careful offset management, but it is completely reusable — -if you have a JDBC connector that handles exactly-once correctly it will work for any database that supports JDBC, the application developer doesn’t have to think about it. We have a bunch of these already, so rather than write one, you can just download them.

The hard part is making general purpose transformations on streams of data correct. This is where the transactional support comes in, in conjunction with the streams API in Kafka.

For those not familiar, the Kafka Streams API is a layer on top of the producer and consumer which gives a very general API for defining transformations on top of input and output streams: virtually anything you can do in your app you can do using this API. If you’re used to classical messaging system APIs this is more not less powerful.

A complete example streams app looks like this:

A complete Kafka Streams application.

This application happens to be computing a distributed “word count”, the classic big data example. This word count though is completely real-time and continuous (the counts increase each time a new document is created).

Note that this is just a normal, plain-vanilla Java application main method. This app is started and deployed the same as any other. It works just like a Kafka consumer does, with all the running instances co-ordinating to process the incoming streams of data.

How can we ensure the correctness of this application? After all this has all the complicated things you could imagine: input, output, aggregation across incoming messages, and distributed processing.

Well, if you think about it all stream processing with Kafka is doing three things:

  1. Reading an input message
  2. Possibly producing an update to its state (which needs to be fault-tolerant, in case the app instance fails and recovers elsewhere)
  3. Possibly producing output message

The key requirement is to ensure these three things always happen together or don’t happen at all. We can’t allow any failure case where the state is updated but no output produced or vice versa.

How can we accomplish this?

We thought really hard about this over a number of years and have been building towards it for some time now. The ground work was a set of changes made over the last few years that you may not have noticed:

  1. In the 0.8.1 release Kafka added log compaction which allows it to be used as a journal and snapshot for state changes. This means you can model a set of updates to any arbitrary local (on disk or in-memory) data structure as a series of writes to Kafka. This allows us to make the state of the local computation fault-tolerant.
  2. Reading data from Kafka amounts to advancing your offsets. In 0.8.2 we moved the offset storage mechanism to use Kafka itself to store the offsets. “Committing” an offset amounts to a write to Kafka under the covers (though the consumer client does this for you so you might not know).
  3. Output to Kafka…well that has always been a write to Kafka.

This implementation set us up for the feature we just added: the ability to transparently wrap these three operations into a single transaction. This guarantees that the read, processing, state updates, and output all either happen together or don’t happen at all.

Won’t this be really slow? Many people assume distributed transactions are inherently very slow. In this case, though, we don’t need to do a transaction for every single input, we can batch them together. The bigger the batch the lower the effective overhead of the transaction (the transactions have a constant cost irrespective of the number of messages in the transaction). The blog post gave performance results for this which were quite promising.

The result is that if I factor my application into something that uses the streams API and I download and use an exactly-once connector for the integration with the output system, I can now get end-to-end correctness with nothing more than a config change.

The really cool thing is that this capability isn’t at all tied to the Java API: the Java API is just a wrapper around a general purpose network protocol for modeling continuous, stateful, correct processing of streams of data. Any language can make use of this protocol. We think of this ability to correctly chain together input and output topics via arbitrary processes that do transformation and implement the protocol as adding almost a kind of “closure” property that’s very powerful.

Back To Arguing About Exactly Once

I wrote this to urge people who are skeptical to dive into what we’ve done and understand it. I get the skepticism: there is plenty of bullshit that comes from vendors. But I think this feature set is really exciting and your understanding will be increased a lot more by understanding what it does and doesn’t do and what the actual limits are rather than by just yelling LIAR in all caps.

I think there has been a lot of assumptions around stream processing that are in the process of being rolled back in our industry — that it can’t produce correct results, that it is fundamentally inefficient, that it is incomplete without batch processing, etc. I think the broad and fuzzy claims around the impossibility of exactly once processing fall into this bucket. They remind me of a kind of distributed systems analogue to broscience (“I heard from my bro, who heard from his bro, who works at Google, that this exactly-once stuff violates the CAP theorem!”). To me, progress is usually made by understanding in more depth what is actually not possible and then trying to redefine the problem to build practical abstractions that move us forward.

One of the best example of this happening right now in my mind, is the work done on systems like Spanner and CockroachDB, which really do a lot to provide usable features to application developers while staying within the bounds of what is possible. I spent a lot of time in the NoSQL space, and I see what those systems are doing as something most of us thought was some imprecisely considered combination of impossible and impractical. I think that should be a lesson to us: Rather than giving up and punting all the hard problems onto the poor person implementing the application we should strive to understand how we redefine the problem space to build correct, fast, and most of all usable system primitives they can build on.