How we (almost :)) achieve End-to-End Exactly-Once processing with Flink

Devora Roth Goldshmidt
CodeX
Published in
8 min readAug 29, 2022

Distributed stateful stream processing is challenging, especially in regard to handling failures and recovery. In stream processing, one of the most frequently asked questions is “does my stream processing system guarantee that each record is processed once and only once, even if some failures are encountered in the middle of processing?”

By saying “exactly-once” semantic I mean that each incoming event affects the final results exactly once. Even in case of a machine or software failure, there’s no duplicate data and no data that goes unprocessed. We are using Apache Flink, a distributed stream processing engine that has long provided exactly-once semantics within the Flink application itself. Flink generates checkpoints on a regular, configurable interval and then writes them to a persistent storage system with the attached position in the input stream. During recovery from a failure, Flink resumes processing from the most recent successfully-completed checkpoint. Flink’s checkpoint algorithm is based on a technique introduced in 1985 by Chandy and Lamport to draw consistent snapshots of the current state of a distributed system without missing information and without recording duplicates

But by saying “end-to-end” I mean extending the exactly-once semantic to the external system which involved — which Flink sends data after processing. Imagine a very standard and simple process that consumes events from Kafka topic, performs tumbling windows of 1 minute and once the window is expired, writes all events to a DB. So that events are grouped into 1m buckets and written by a single bulk insert operation.

Stream processing with external state

Obviously, the desired situation is that all incoming events will eventually appear in the DB but only once.

Idempotent Writing

In case the application writes records with associated consistent unique identifiers, exactly-once can be achieved by an idempotent implementation of writing to DB. This means, instructing the DB to just ignore or override duplications so that even in case of re-processing of the same events there won’t be a real impact on the external table.

Returning back to the example application which writes the raw events to DB once a minute, every event has an associated ID that can be used as table PK. Assuming Flink checkpoint is triggered every 5 seconds, a failure may happen between two checkpoints. In this case, Flink will recover from the last checkpoint and replay from there. Actually, all messages from the last checkpoint will be re-processed and sent to the DB (all events within the dotted red line in the diagram below) but they’ll be ignored since this is an attempt to write the same event ID twice.

Re-processing of the dotted timeline events, but DB ignores them

One can argue that this pattern is actually stateless events processing which Flink might be an overkill solution for. The complicated Flink checkpoint mechanism is very useful when there is a state distributed across multiple nodes and should be persisted properly but this app could be simply implemented by native Kafka consumer/produce APIs, group the events, write them down and only then commit the Kafka offset. In case of any consumer timeout or re-try, the same message will be re-processed but w/o a real impact. I agree. If every application was deterministic and idempotent, our life would be much easier (or boring :)).

Kafka Transaction

In case the application is in a form of consume-process-produce from an inbound and into an outbound topic, producing messages through Kafka Transaction API may be a good choice of consuming, processing and producing a message by an atomic operation.

The key here is that any downstream Kafka consumer that polls the outbound topic will only ever receive those resulting messages once — it is guaranteed there will not be duplicates, even if the data sink needs to retry producing the message. Failure scenarios may mean that the original message is consumed and processed (or partially processed) multiple times, but this will never result in duplicate outbound events being published.

In order to leverage this technique, all those three operators (source, window and sink) should run on the same component so that the producing of any outbound message will be surrounded by the same transaction that commits the consumer offset. The following is the transactional flow:

  1. Service calls beginTransaction for initiating a new transaction
  2. Service publishes messages by the Producer
  3. The consumer offsets are also sent to the Producer for being included in the same transaction
  4. Service calls commitTransaction to complete the transaction

For supporting transactional producing, Kafka introduces new components and concepts like Consumer Coordinator, Transaction Coordinator, and Transaction Log which are detailly described in their > 60-page design document, but it’s important to realize that all other actions occurring as part of the processing can still happen multiple times, on the occasions where the original message is redelivered. If, for example, the application performs REST calls to other applications or performs writes to a database, these can still happen multiple times. The guarantee is that the resulting events from the processing will only be written once.

Two-Phase Commit Approach

There is another type of application that consumes from Kafka, performs some aggregation and then writes results to external DB. It’s generally pretty hard to implement idempotency since aggregation values may be changed, but on the other hand, they can not really benefit from Kafka transaction since it’s anyway not bounded to the DB transaction.

Let’s change our original example a little bit so that instead of writing the raw events, it counts the number of events that happened during the 1-minute window and writes this aggregated value (just the number) to a DB. Obviously, our topic has several partitions and several instances of the source, windowing and data sink operators running in parallel. There isn’t any keyBy or logical partitioning required by the business, we simply want to count events in a minute granularity. Trying to achieve idempotent writing: what should be our record identifier in the aggregation table? Defining it like <Sink Instance ID, minute> is problematic because nothing guarantees that the same events will be sent to the same operator instances after the recovery. The below diagram presents a potential situation where a specific sink operator instance originally processes e45, e47, and e52 events but after the recovery from the last checkpoint, it gets e48, e49, and e50 (just because it now gets windows different upstream windows operator).

Flink feature of TwoPhasedCommitSink feature can be really useful. For achieving exactly-once in this scenario, Flink enables coordination of writing to an external system with its internal checkpoint mechanism. The external systems must provide a means to commit or roll back writes so that those events can be triggered and coordinated with Flink’s checkpoint management. One common approach for coordinating commits and rollbacks in a distributed system is the two-phase commit protocol. The two-phased-commit sink should implement four different methods that Flink will call during the various phases of the checkpointing process:

  1. beginTransaction a transaction bundles all writes between two checkpoints, so writes are always within a scope of a transaction. This function is called at the beginning of a new checkpoint. So here you can open a DB transaction if your DB supports it, or create a temporary file in the file system. All subsequent event processing will use it until the next checkpoint.
  2. preCommit the pre-commit is called by the sink once it gets the checkpoint barrier after successfully persists its internal state. This will be called by every sink so that Flink JobManager (the coordinator) can commit the checkpoint only after all sinks perform the pre-commit successfully. Here, you can flush the file, close it and never write to it again. Or alternatively, start a new DB transaction for any subsequent writes that belong to the next checkpoint.
  3. commit the commit will be called by every sink only once the JobManager notifies them that the checkpoint is completed. In this phase, you can atomically move the pre-committed file to the actual destination directory or alternatively commit the DB transaction.
  4. abort abort function will be called as distributed checkpoint has been aborted or when aborting a transaction that was rejected by a coordinator after a failure. Here, for example, you can delete the temporary file or abort the DB transaction.

Notice that this sink implementation is working natively with DB that does support transactions. But even in our case where the sink writes down the aggregations to AWS AppStream (TimeSeries DB), a custom implementation of the TwiPhasedCommit function is possible and the actual DB writing should be postponed to the commit phase.

The important point to notice is that after a successful pre-commit, the commit must be guaranteed to eventually succeed — both our operators and our external system need to make this guarantee. If a commit fails (for example, due to an intermittent network issue), the entire Flink application fails, restarts according to the user’s restart strategy, and there is another commit attempt. This process is critical because if the commit does not eventually succeed, data loss occurs.

Therefore, we can be sure that all operators agree on the final outcome of the checkpoint: all operators agree that the data is either committed or that the commit is aborted and rolled back.

DB Writing is attached to the Checkpoint mechanism

So what we eventually achieved is that the actual DB writing is performed in the same “transaction” of the persisted checkpoint and the consumer offset.

Summary

Dealing with stateful processing in a distributed system, especially when different external data sources or sinks (Kafka and DB) are involved, is challenging. Duplicate messages are an inevitable aspect of the fact that every component may fail temporarily or forever. Flink’s checkpointing system serves as Flink’s basis for supporting a two-phase commit protocol and aims to provide end-to-end exactly-once semantics. As stated above, there may be rare cases where the transaction won’t be completed, Flink will re-start and re-try forever (or until AWS will solve their AppStream outage…) but this is supposed to be a really rare case that probably this famous joke is considering:

--

--

Devora Roth Goldshmidt
CodeX
Writer for

Senior Software Architect (SAAS), passionate about solving complex system problems and continuously considering trade-offs!