A simple guide to processing guarantees in Apache Flink

Gyula Fóra
Cloudera
Published in
7 min readOct 26, 2021
Photo by Cytonn Photography on Unsplash

Apache Flink is a very powerful framework for running large scale stream processing applications that can satisfy almost any requirement that you throw at it. The caveat is that in some cases you need to understand what you want out of the system to access its full potential.

Every production stream processing use case comes (sometimes implicitly) with a required level of consistency, or in other words correctness, which should be satisfied even when things go wrong. By going wrong we mean a number of things, such as server failures in the cluster, network issues and other reasonable problems that typically arise during production use.

In this blogpost we look at processing guarantees from a practical perspective and how to configure your Flink applications to avoid bad surprises in production.

Different processing guarantees and what they are all about

On a very high level we can distinguish between two main types of processing guarantees in Flink.

No guarantees (Checkpointing Off)

When checkpointing is turned off Flink offers no inherent guarantees in case of failures. The in-memory state of your processing operators will be lost and when the job restarts it will continue from a point that depends solely on the settings of your source connectors.

This means that you can either have data loss or duplicate messages combined always with a loss of application state.

When using the Kafka source connector for example the failure behaviour will depend on your start offset setting:

  • group-offsets : Data duplication + loss of state
  • earliest-offset : Huge data duplication + loss of state
  • latest-offset : Data loss + loss of state

Is this even relevant for production use-cases?
For the vast majority of the users, running in production with data loss or duplication combined with sometimes strange results makes no sense.

Photo by Kelly Sikkema on Unsplash

However there are some specialised applications where a data loss is tolerable in return for blazing fast recovery times and always consuming the latest incoming data. These applications are often stateless or have transient states with a business logic where processing fresh data with very low latency is far more valuable than processing everything. Examples would be alerting applications where only low latency alerts are useful or applications that aim to enhance live user experience where the window of opportunity is very small.

Exactly-once processing (Checkpointing On)

When checkpointing is enabled, Flink provides exactly-once processing semantics for your stateful streaming applications.

Yeah this sounds pretty cool, but what does it really mean?

Exactly-once processing semantics means that any stateful data processing that happens inside Flink (and relies on Flink’s state abstractions) will have consistent and correct behaviour even if there are failures. It is very important to understand that this only applies to processing logic that is strictly dependent on the state of the application and it does not extend to external systems and other side effects.

Let’s look at a toy example to understand the limitations of this mechanism:

Our toy pipeline:

  1. Read data from Kafka using the Flink Kafka consumer connector
  2. Apply window aggregation
  3. Log output to file (using a standard logging framework like log4j)

In this particular case the exactly-once semantics will only apply to steps 1 & 2 as those are stateful operations. But step 3, the logging itself, is not covered as that is a simple side effect and does not integrate with the stateful mechanism. When a failure occurs, our Kafka reader position and window aggregation will be restored to a consistent state and we reprocess some events, which will be logged multiple times (before the failure and after recovery).

Most source connectors in Flink integrate with checkpointing to provide exactly once semantics and as long as you use Flink’s built in operators and stateful abstractions you are safe on the processing side as well. The default behaviour for most output (sink) connectors like Kafka however is that they will send duplicate messages if the job has to restart due to a failure (at-least-once output).

In the next section we will look at why providing exactly-once output is tricky and what to do if we need end-to-end exactly-once.

End-to-end exactly-once and why it is special

End-to-end in this case refers to that it not only covers input sources and processing but also the output of your Flink application. So no matter what happens your pipeline will actually produce correct results (and only once). This is actually what most people will think of when they first hear the concept of exactly-once.

As we have seen this is not the default behaviour for most pipelines. The most common behaviour is exactly-once processing + sometimes duplicate output messages. Let’s understand the reason behind this limitation.

There are two challenges with guaranteeing exactly-once output:

  1. It’s pretty complicated (sometimes nearly impossible)
  2. Incurs a significant latency penalty

The first issue relates to producing “transactional” output from our streaming pipeline. If we fail and restore to a checkpoint, we need to be able to roll back our output. To support this Flink implements a two-phase commit protocol that sinks can use to integrate with the checkpoint mechanism and provide exactly-once output. Some Flink connectors (Kafka and File sink for instance) come with built-in exactly-once capabilities while others only support at-least -once guarantees. We can also implement our custom transactional sink logic using the ​​TwoPhaseCommitSinkFunction. This transactional protocol leads us directly to the second problem.

Transactional output means that we can only trust (or see) our results once it has been committed. In our case we commit output transactions on successful checkpoints. Flink takes checkpoints periodically, in most cases every few seconds or minutes depending on the state size and SLAs. For exactly-once sinks, this will be the minimum latency that we can expect for some output.

Higher checkpointing frequency will have a larger performance overhead especially with large state so depending on our throughput requirements there is always a limit on how often we can take a checkpoint. In most cases checkpointing every few seconds will be the most we can do.

Not every sink connector in Flink supports exactly-once mode. We will focus on Kafka in this example but you can find exhaustive information in the official Flink docs.

Configuring Kafka Sink exactly-once

In order to get exactly-once output from your Kafka sink without bad surprises you have to set a few different things:

Make sure exactly-once mode and transactional id prefixes are set

For both DataStream and SQL API we need to set 2 things:

  1. Set the delivery guarantee to exactly-once
  2. Set a globally (or at least for the Kafka cluster) unique transactional id prefix

SQL:

CREATE TABLE KafkaTable (
item_id BIGINT,
price BIGINT
) WITH (
'connector' = 'kafka',
'topic' = 'mytopic',
'properties.bootstrap.servers' = 'localhost:9092',
'scan.startup.mode' = 'group-offset',
'format' = 'json',
'sink.delivery-guarantee' = 'exactly-once',
'sink.transactional-id-prefix' = 'my_unique_prefix_1'

);

DataStream:

new KafkaSinkBuilder<String>()
.setBootstrapServers("localhost:9092")
.setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE)
.setTransactionalIdPrefix("my-my_unique_prefix_1")
...

Enable checkpointing

Now that our sinks are configured we need to make sure that checkpointing is enabled and the checkpoint interval is low enough to satisfy our output delivery latency SLA. We can also enable concurrent checkpoints if necessary.

Example config:

execution.checkpointing.interval: 10 s
execution.checkpointing.max-concurrent-checkpoints: 2
execution.checkpointing.mode: EXACTLY_ONCE

Configure max transaction timeout in Kafka

Last but not least as Flink relies on Kafka transactions to manage the output data we should configure the transaction timeout of the Kafka system so that it does not close pending transactions when the Flink job is down. This would lead to unrecoverable data loss in most cases.

We need to set 2 configurations one on the Flink producer and one on the Kafka broker side:

Kafka Producer: transaction.timeout.ms
Kafka Broker: transaction.max.timeout.ms

Both of these transaction timeouts should be set higher than the checkpointing interval plus the max expected Flink downtime

Consuming exactly-once output

When consuming topics written by a transactional producer we have the option to set our consumer isolation level. By configuring isolation.level we have access to two consumer modes:

  1. read_uncommitted : read every record as soon as it was written, irrespective of the current transaction status. This basically gives low latency at-least-once access.
  2. read_committed :read only committed records, guaranteeing that you get stable exactly-once input (at the cost of higher latency)

We can see that turning on exactly-once for the Kafka producer does not automatically force the latency penalty for all downstream consumers but only the ones that actually use the read_committed isolation level.

Other alternatives

We saw that using proper transactional sinks for true end-to-end exactly-once carries a considerable overhead. Before jumping to solutions we always need to thoroughly understand our consistency requirements so we can select the best configuration. For the majority of use-cases the at-least-once sink semantics works equally well with greatly reduced checkpointing and latency overhead.

Many storage systems (key value stores for instance) also provide support for idempotent writes which we can leverage to get consistent results even with retries. For these systems going transactional would be an unnecessary complication that would only make our lives more difficult without providing any benefit for our applications.

For the cases where we truly need end-to-end consistency, Flink provides the necessary connector settings and abstractions to live up to the highest standards. If you feel that this is the route you are going to take make sure to dive deeper into the Flink documentation and other resources on this topic.

--

--