Each of the projects I’ve worked on in the last few years has involved a distributed message system such as AWS SQS, AWS Kinesis and more often than not Apache Kafka. In designing these systems, we invariably need to consider questions such as:
How do we guarantee all messages are processed?
How do we avoid or handle duplicate messages?
These simple questions are surprisingly hard to answer. To do so, we need to delve into how producers and consumers interact with distributed messaging systems. In this post I’ll be looking at message processing guarantees and the implications these have when designing and building systems around distributed messages systems. I will be specifically concentrating on the Apache Kafka platform as it’s such a popular choice and the one I am most familiar with.
To start with, let’s look at the basic architecture of a distributed message system.
A producer process reads from some data source which may or may not be local, then writes to the messaging system over the network. The messaging system persists the message, typically in multiple locations for redundancy. One or more consumers poll the messaging system over the network, receive batches of new messages and perform some action on these messages, often transforming the data and writing to some other remote data store, possibly back to the messaging system. This basic design applies to Apache Kafka, Apache Pulsar, AWS Kinesis, AWS SQS, Google Cloud Pub/Sub and Azure Event Hubs among others.
If you read the documentation for any of these systems, you will fairly quickly come across the concept of message processing guarantees. These fall under the following categories:
- No guarantee — No explicit guarantee is provided, so consumers may process messages once, multiple times or never at all.
- At most once — This is “best effort” delivery semantics. Consumers will receive and process messages exactly once or not at all.
- At least once — Consumers will receive and process every message, but they may process the same message more than once.
- Effectively once — Also contentiously known as exactly once, this promises consumers will process every message once.
At this point, you may be asking why is this so complicated? Why isn’t it always effectively once? What’s causing messages to go missing or appear more than once? The short answer to this is system behaviour in the face of failure. The key word in describing these architectures is distributed. Here is a small subset of failure scenarios that you will need to consider:
- Producer failure
- Consumer publish remote call failure
- Messaging system failure
- Consumer processing failure
Your consumer process could run out of memory and crash while writing to a downstream database; your broker could run out of disk space; a network partition may form between ZooKeeper instances; a timeout could occur publishing messages to Kafka. These types of failures are not just hypothetical — they can and will happen with any non-trivial system in any and all environments including production. How these failures are handled determines the processing guarantee of the system as a whole.
Before looking at the different types of guarantees in detail, we’ll have a quick look at the Kafka consumer API as the implementation is relevant to many of the examples given below.
Kafka Consumer API
Processes pull data from Kafka using the consumer API. When creating a Consumer, a client may specify a consumer group. This identifies a collection of consumers that coordinate to read data from a set of topic partitions. The partitions of any topics subscribed to by consumers in a consumer group are guaranteed to be assigned to at most one individual consumer in that group at any time. The messages from each topic partition are delivered to the assigned consumer strictly in the order they are stored in the log. To save progress in reading data from Kafka, a consumer needs to save the offset of the next message it will read in each topic partition it is assigned to. Consumers are free to store their offsets wherever they want but by default and for all Kafka Streams applications, these are stored back in Kafka itself in an internal topic called
_consumer_offsets. To use this mechanism consumers either enable automatic periodic commitment of offsets back to Kafka by setting the configuration flag
true or by making an explicit call to commit the offsets. In the example below, the consumer would store offset 5 for
topic-a partition-0 and offset 7 for
Let’s now look at each of these guarantees in detail and see some real-world examples of systems that provide each guarantee.
A system that provides no guarantee means any given message could be processed once, multiple times or not at all. With Kafka a simple scenario where you will end up with these semantics is if you have a consumer with
enable.auto.commit set to
true (this is the default) and for each batch of messages you asynchronously process and save the results to a database.
With auto commit enabled, the consumer will save offsets back to Kafka periodically at the start of subsequent poll calls. The frequency of these commits is determined by the configuration parameter
auto.commit.interval.ms. If you save the messages to the database then the application crashes before the progress is saved, you will reprocess those messages again the next run and save them to the database twice. If progress is saved prior to the results being saved to the database, then the program crashes, these messages will not be reprocessed in the next run meaning you have data loss.
At most once
At most once guarantee means the message will be processed exactly once, or not at all. This guarantee is often known as “best effort” semantics.
1. A common example that results in at most once semantics is where a producer performs a ‘fire-and-forget’ approach sending a message to Kafka with no retries and ignoring any response from the broker. This approach is useful where progress is a higher priority than completeness.
2. A producer saves its progress reading from a source system first, then writes data into Kafka. If the producer crashes before the second step, the data will never be delivered to Kafka.
3. A consumer receives a batch of messages from Kafka, transforms these and writes the results to a database. The consumer application has
enable.auto.commit set to
false and is programmed to commit their offsets back to Kafka prior to writing to the database. If the consumer fails after saving the offsets back to Kafka but before writing the data to the database, it will skip these records next time it runs and data will be lost.
At least once
At least once guarantee means you will definitely receive and process every message, but you may process some messages additional times in the face of a failure. Here’s a few examples of some failure scenarios that can lead to at-least-once semantics:
1. An application sends a batch of messages to Kafka. The application never receives a response so sends the batch again. In this case it may have been the first batch was successfully saved, but the acknowledgement was lost, so the messages end up being added twice.
2. An application processes a large file containing events. It starts processing the file sending a message to Kafka for each event. Half way through processing the file the process dies and is restarted. It then starts processing the file again from the start and only marks it as processed when the whole file has been read. In this case the events from the first half of the file will be in Kafka twice.
3. A consumer receives a batch of messages from Kafka, transforms these and writes the results to a database. The consumer application has
enable.auto.commit set to
false and is programmed to commit their offsets back to Kafka once the database write succeeds. If the consumer fails after writing the data to the database but before saving the offsets back to Kafka, it will reprocess the same records next time it runs and save them to the database once more.
Many distributed messaging systems such as Pulsar and Prevega as well as data processing systems such as Kafka Streams, Spark, Flink, Delta Lake and Cloud Dataflow claim exactly-once or effectively-once semantics in certain scenarios. In 2017 Confluent introduced Exactly Once semantics to Apache Kafka 0.11. Achieving exactly-once, or as many prefer to call it, effectively-once was a multi-year effort involving a detailed public specification, extensive real world testing, changes in the wire protocol and two new low level features to make it all work — idempotent writes and transactions. We’ll start by looking at these two features to see what they are and why they’re necessary for effectively-once support.
One of the at least once guarantee scenarios given above covered the case of a producer that is unable to determine if a previous publish call succeeded, so pushes the batch of messages again. In previous versions of Kafka, the broker had no means of determining if the second batch is a retry of the previous batch. From Kafka 0.11 onwards, producers can opt-in to idempotent writes (it’s disabled by default), by setting the configuration flag
true. This causes the client to request a producer id (pid) from a broker. The pid helps the Kafka cluster identify the producer. With idempotence enabled, the producer sends the pid along with a sequence number with each batch of records. The sequence number logically increases by one for each record sent by the same producer. Given the sequence number of the first record in the batch along with the batch record count, the broker can figure out all the sequence numbers for a batch. With idempotence enabled, when the broker receives a new batch of records, if the sequence numbers provided are ones it has already committed, the batch is treated as a retry and ignored (a ‘duplicate’ acknowledgement is sent back to the client).
When idempotent writes first came out in v0.11 the brokers could only deal with one inflight batch at a time per producer in order to guarantee ordering of messages from the same producer. From Kafka 1.0.0, support for idempotent writes with up to 5 concurrent requests (
max.in.flight.requests.per.connection=5) from the same producer are now supported. This means you can have up to 5 inflight requests and still be sure they will be written to the log in the correct order. This works even in the face of batch retries or Kafka partition leader changes since in these cases the cluster will reorder them for you.
Transactions give us the ability to atomically update data in multiple topic partitions. All the records included in a transaction will be successfully saved, or none of them will be.
Transactions are enabled through producer configuration. Clients need to first enable idempotent writes (
enable.idempotence=true) and provide a transactional id (
transactional.id=my-tx-id). The producer then needs to register itself with the Kafka cluster by calling
initTransactions. The transactional id is used to identify the same producer across process restarts. When reconnecting with the same transactional id, a producer will be assigned the same pid and an epoch number associated with that pid will be incremented. Kafka will then guarantee that any pending transactions from previous sessions for that pid will either be committed or aborted before the producer can send any new data. Any attempt by an old zombie instance of the producer with an older epoch number to perform operations will now fail.
Once registered, a producer can send data as normal (outside a transaction) or initiate a new transaction by calling
beginTransaction. Only one transaction can be active at a time per producer. From within the transaction, the standard
send method can be called to add data to the transaction. Additionally, if a producer is sourcing data from Kafka itself, it can include the progress it is making reading from the source in the transaction by calling
sendOffsetsToTransaction. In Kafka the default method for saving consumer progress is to save offsets back to an internal topic in Kafka and hence this action can be included in a transaction.
Once all required messages and offsets are added to the transaction, the client calls
commitTransactionto attempt to commit the changes atomically to the Kafka cluster. The client is also able to call
abortTransactionif they no longer wish to go ahead with the transaction, likely due to some error.
The producer and brokers do not wait until the transaction is committed before writing the data to the data logs. Instead the brokers write records to the logs as they arrive. Transactional messages are also bracketed with special control messages that indicate where a transaction has started and either committed or aborted. Consumers now have an additional configuration parameter called
isolation.levelthat must be set to either
read_uncommitted(the default) or
read_committed. When set to
read_uncommitted, all messages are consumed as they become available in offset ordering. When set to
read_committed, only messages not in a transaction, or messages from committed transactions are read in offset ordering. If a consumer with
isolation.level=read_committedreaches a control message for a transaction that has not completed, it will not deliver any more messages from this partition until the producer commits or aborts the transaction or a transaction timeout occurs. The transaction timeout is determined by the producer using the configuration
transaction.timeout.ms(default 1 minute).
That was a quick tour the key features of transactions. If you want to know more, this Confluent blog or the original design are both well worth a read. Now that we’ve established the basics on idempotent writes and transactions let’s see how they help us achieve effectively once guarantees.
Effectively once production of data to Kafka
When producing data to Kafka you need to save progress and publish the data to Kafka itself. Depending on the order you do these two operations you get different delivery semantics.
It seems that whatever order you perform these operations, you can’t get effectively once semantics. The issue with all these scenarios is in the face of failure we can be left with one of the two operations succeeding leaving us in an inconsistent state. We need to have a mechanism to either save the progress and the data or, in the face of failure, do neither. In other words we need to atomically save both the progress and data.
Kafka transactions can help us out here, but only if we’re willing and able to store the progress of reading the source in Kafka itself. Transactions only work on data stored in Kafka and cannot be extended to cover data stored elsewhere.
This will guarantee your producer is always in a consistent state with your data being in sync with your source system. This is true even after a hard crash of your producer as after restarting your producer with the same transactional id, Kafka will guarantee all existing transactions will be resolved (aborted or committed) before it starts adding any new data.
Effectively once consuming of data from Kafka
The scenarios for consuming and processing data from Kafka should look familiar if you were paying attention in the previous section.
We have the same issues as with publishing data to Kafka where regardless of the order we do our data processing and storage of consumer offsets, we don’t get effectively once semantics. If our destination system is Kafka we can solve this just as we did in the previous section, by atomically updating the destination and storing the offsets as part of a transaction. Similarly if our destination is not Kafka but does have transactional support (such as PostgreSQL or MongoDB) we can use the same approach here by storing both the offsets and data in the destination system and updating both atomically.
For Kafka clients reading from Kafka there is another option to achieve effectively-once semantics. If your stream processing or destination systems is able to behave idempotently, you can achieve the same effect by performing the idempotent operation then saving the progress.
An idempotent operation is one where if you perform it multiple times it has the same effect as if it were performed once. If the application fails after performing the operation but before saving progress, it won’t matter that the next time the application is restarted the operation is run again.
An example of such a system is upserting a record into a database. If the record is already present in the database, any duplicates will simply update the record with the same values. Another example is the Kafka Connect S3 Sink Connector. In this case the connector saves the data to an S3 object, then commits the offsets to Kafka. Performing a multi-part upload to an S3 object multiple times with the same data is an idempotent atomic operation. So if the connector fails after writing a file, but prior to saving the offsets, it will simply repeat the process next time with the destination file being atomically replaced with a new file of exactly the same data.
Effectively once in Kafka Streams
Kafka Streams is a JVM based library for performing streaming transformations on data sourced from Kafka. It is a very good default choice for data processing backed by Kafka due to its simple deployment model, horizontal scalability, resilience to failures and straightforward well documented public API.
The API lets you read data from Kafka then perform a number of transformations on the data including filtering, mapping, aggregation and joining data from multiple topics. The API supports both stateless and stateful transformations. Stateful transformations back up their state into changelog topics in Kafka and cache values local to the processing nodes, typically using RocksDB.
Exactly once semantic support was added to Kafka Streams in the 0.11.0 Kafka release. Enabling exactly once is a simple configuration change setting the streaming configuration parameter
exactly_once(the default is
A typical workload for a Kafka Streams application is to read data from one or more partitions, perform some transformations on the data, update a state store (such as a count), then write the result to an output topic. When exactly once semantics is enabled, Kafka Streams atomically updates consumer offsets, local state stores, state store changelog topics and production to output topics all together. If any one of these steps fail, all of the changes are rolled back.
The Kafka Streams API lets you perform any action when processing input data, for instance you can write the data directly to a database, or fire off an email. These “side effect” operations are explicitly not covered by the exactly once guarantee. If a stream job fails after processing data, but just prior to writing it back to Kafka, it will be reprocessed the next time the Kafka Streams worker is restarted re-running any side-effect operations such as emailing a customer.
It is best to keep your Kafka Streams transform “pure” with no side effects beyond updating state stores and writing back to Kafka. This way your application will be more resilient (it won’t fail if it can’t contact the email server) and will shorten the length of transactions.
Side effects can then be performed via Kafka Connect Sink connectors or custom consumers.
The additional work required for transactions is independent of the number of messages included in the transaction, so throughput is improved by increasing the size of transactions. For Kafka Streams, transactions are cycled each time a stream worker commits their changes. This is tunable via the
commit.interval.ms configuration. The higher the commit interval, the higher the throughput will be, but at the cost of increased latency for downstream “read_committed” consumers.
If your Kafka Streams application has multiple sub-topologies where intermediate data is written back to Kafka then read in a later step, each of these steps needs to wait for the transaction for the previous step to complete before it can read any data. The end to end latency of such a pipeline is worst case lower bounded by the number of intermediate steps multiplied by the commit interval. Expect this scenario to improve in future Kafka Streams releases as there’s talk of allowing internal consumers to reduce latency by reading “unstable data” triggering cascading rollbacks in failure scenarios.
Finally, in the case of an unclean shutdown of a stream job, local state stores could be left in an unknown state. In this case Kafka Streams rebuilds the state stores from changelog topics in Kafka. This process can take some time for large state stores. You can alleviate this situation by shutting down your topology cleanly whenever possible. In future Kafka releases there is talk of adding checkpoints to the local state stores, so that a full restore is not required each time.
Effectively once for the win?
Now that effectively once is possible with Kafka, why not use it all the time? As clearly stated in the Confluent articles, enabling exactly-once isn’t magical pixie dust. As should now be clear given the examples above, duplicates and lost messages are due not only to features of the messaging systems, but in the design of producer and consumer applications as well.
For any new applications, either using the lower level consumer/producer APIs or with Kafka Streams you now have the knowledge for how to set up effectively once. Very often you will wish to source or deliver data to other systems, many of which do not (yet) have effectively once support.
Probably the biggest gap right now is Kafka Connect Source connectors. The Connect Source API was designed well before transactions were available and there’s some work required to update it to support effectively once. You can track progress via the issue KAFKA-6080. Also, change data capture (CDC) systems such as Debezium only support at least once semantics in the event of system failure.
Another reason you may wish to choose to forgo effectively once guarantees is where you value progress over completeness. With at least once and effectively once guarantees, a producer will keep trying forever when attempting to push data into Kafka. For some setups such as real time monitoring, ML model scoring or an RFQ system, older messages may no longer useful, so dropping old data under failure conditions may be an acceptable trade-off.
Often data flows involve chains of services reading from the output topics of a previous service and writing to the input topics of the next service. As you can see from the table below, delivery guarantees don’t get better when you combine them. If you have anything other that effectively once anywhere in your pipeline, the whole pipeline will no longer be effectively once.
Using effectively once semantics everywhere can be a real challenge. You may be dealing with data sources that already have duplicates (be sure to check the semantics of all your source feeds), or it simply may be too much effort to always store your offsets atomically with your data. At least once guarantees are much easier to achieve. The offsets do not need to be stored together with the data. As long as you always store the offsets after saving the data, you can save offsets much less frequently than saving the data depending on how much duplicate data you can tolerate on a failure.
Given you have a stream of at least once data, is it possible to turn this into an effectively once stream? One way to do this is have all consumers of your data be idempotent, effectively ignoring the duplicate records. For this to work consumers need to be idempotent at the message level not the batch level. For instance, you can configure the JDBC and ElasticSearch Kafka Connect Sinks to be idempotent at the message level, but not the S3 Sink. Giving the S3 Sink a stream with duplicates, it will faithfully copy to S3 exactly what’s in the topic, duplicates and all.
For consumers that cannot act idempotently, you also have the option of adding a step to remove duplicates from your streaming pipeline converting your at least once guarantee into an effective once one.
Deduplicating a stream of records is an inherently stateful operation. For each record that arrives we need to determine if we’ve seen the record contents before. If so, discard the record, if not pass it on and update our state to remember the new record. What state should we store to identify duplicates?
Using a key to identify duplicates
If your data has an intrinsic business key that uniquely identifies a record, this can be a good candidate for deduplication. If no such key exists, a hash of the contents can also be used.
If using Kafka Streams as your deduplication mechanism, you can create a state store that adds an entry for each unique key it receives. When a new record is received, check the store to see if the key has been seen before.
Kafka Stream state stores are specific to source topic partitions, so make sure the source topic either has a single partition or that the Kafka messages are keyed by the same key you use for deduplication purposes. Kafka producers use the message key to determine what partition to push records into. Records with the same key will end up in the same partition.
This approach only works if you will never see update messages for the given key. In this case, you could store an update timestamp or version number in the state store value entry for each message and only pass through updates where the timestamp or version is greater than the previous value. If the number of unique keys is unbounded, strongly consider windowing your state store. This is reasonable since at least once failure scenarios where a failed batch is replayed, the duplicate messages are generally added within a few minutes or hours of the original messages, depending on the length of the outage.
Using a sequence number for deduplication
Using a key for deduplication does work well, but comes at the cost of additional latency and the ongoing cost of managing the state store. In some cases a lighter weight approach works if the source stream has some form of sequence number attached to each message whose value increases with each message. One such system is Change Data Capture (CDC) where the location in the database transaction log, often called the LSN, is included with each message along with a separate sequence number that identifies an individual CDC update within a transaction. Combining these two values you have a key that you expect to always go up. You can then have a state store with a single entry per partition storing the highest key seen to date (the high water mark for the partition). For any new message, if the key is less than or equal to the previously stored value, it is considered a duplicate and dropped. Otherwise the value is forwarded on and the state store is updated. The advantage to this approach is the lookups are fast (always in memory), the state stores are very small and never grow, and it can handle duplicates after any length of time since we don’t require windowing.
Using a datastore for deduplication
This example is a variation on the upsert based idempotent consumer example given above. In this case, we want to be sure we don’t have duplicates in the destination datastore, but don’t have a mechanism for updating. To identify duplicate messages we first query the destination system for each message and drop any that are already there, inserting the remaining messages. If doing this as a batch process using Spark you’d first remove the duplicates within a batch using the DataFrame method
dropDuplicates, identify the field(s) to use as the key, then perform a left-anti join against the destination datastore. This removes all messages that are already in the destination. The resulting dataframe can then be safely inserted into the destination datastore.
For datastores where querying existing records is slow, you can add an optimisation using a probabilistic matching such as a bloom or cuckoo filter. These filters take up a predictable amount of space and can tell for each record in a stream whether you have definitely not seen it before, or you “may have”. By running records through this filter, we only have to query the actual source for those records that “may have” been seen before.
Effectively once message processing is possible today with Apache Kafka. This is a big achievement for the platform, and something its competitors such as Apache Pulsar, AWS Kinesis, Azure Event Hubs or Google Cloud Pub/Sub do not currently support so robustly. Nevertheless achieving an end-to-end effectively once data pipeline is a challenging prospect involving strict discipline. The basic tooling is available, but to achieve effectively once, every streaming process involved needs to be designed with effectively once support in mind. I expect the situation to gradually improve as components like Kafka Connect add effectively once support to sources, and other sources such as CDC systems start atomically storing their progress in Kafka.
At least once guarantees are far easier to implement and almost universally supported by existing tools. If your destination systems are tolerant of duplicates, an at least once pipeline is a good choice. Encourage your downstream consumers to be idempotent and you save the burden of implementing effectively once entirely.
Finally in cases where your consumers cannot be made idempotent and you have at least once semantics you can consider adding a deduplication step to convert your stream to effectively once semantics. Although deduplication is a useful tool to have available to you, be aware that it comes at a cost in pipeline complexity, additional state storage and increased latency.