Consuming a Kafka Topic Is Easy, Isn’t It?

Claudio Gargiulo
Julius Baer Engineering
11 min readMar 25, 2024

Introduction

Kafka is one of the most reliable and scalable event streaming systems for building event-driven architectures. Originally developed at LinkedIn, it has been an open source project supported by the Apache Software Foundation since 2012.

At its core, Kafka is a client-server system that allows client applications to publish messages and consume streams.
The concept is simple: Application A publishes a message to a channel (a Topic) and consumer B reads it. After a certain retention period, the message gets deleted regardless of the read status.

Over the years, the dev community behind Kafka has put a lot of effort into abstracting all the technicalities of the underlying platform from the Kafka client libraries, and nowadays implementing an application capable of consuming and producing messages is a matter of hours.
However, understanding what lies under the hood is fundamental to building reliable applications and preventing various subtle issues that can arise once in production.

In this article, I will describe two of the challenges that developers of consumer applications may face: error handling and consumer lag.

The Confluent Kafka example

Our journey starts with the sample application presented in the Get Started tutorial of the Confluent website. This application produces ten messages on a topic named purchases and, on the consumer side, prints them to the console.
Let’s start by looking at the consumer code which reads the messages from the subscribed topic one by one and prints them to the console, until an exit condition arises (a Ctrl-C press):

  ...
using (var consumer = new ConsumerBuilder<string, string>(
configuration.AsEnumerable()).Build())
{
consumer.Subscribe(topic);

try {
while (true) {
// The consumer.Consume method is blocking,
// so the print to the console will only happen
// when a record is read.
var cr = consumer.Consume(cts.Token);
Console.WriteLine($"Consumed event from topic {topic}: key = {cr.Message.Key,-10} value = {cr.Message.Value}");
}
}
catch (OperationCanceledException) {
// Ctrl-C was pressed.
}
finally{
consumer.Close();
}
}
...

That’s easy, right? If we run both the producer and the consumer, we’ll get the following output:

The output from the ‘Getting Started‘ tutorial: producer on the left, consumer on the right

However, in a more concrete scenario we may want to do something more than printing to the console.

What happens if there is an error while processing the message?

Error handling in Kafka

Let’s modify the consumer code so that it triggers an error if a specific key is fetched:

try
{
while (true)
{
var cr = consumer.Consume(cts.Token);

if (cr.Message.Key == "jsmith")
{
throw new Exception("I cannot handle jsmith!");
}

Console.WriteLine($"Consumed event from topic {topic}: key = {cr.Message.Key,-10} value = {cr.Message.Value}");
}
}
catch (OperationCanceledException)
{
// Ctrl-C was pressed.
}
finally
{
consumer.Close();
}

If we run the code again, we can see that the consumer process crashes at the jsmith message:

Debug: [thrd:main]: GroupCoordinator/1: Fetch committed offsets for 2/2 partition(s)
Debug: [thrd:main]: Partition purchases [1] start fetching at offset 195 (leader epoch 0)
Debug: [thrd:main]: Partition purchases [0] start fetching at offset 95 (leader epoch 0)
Consumed event from topic purchases: key = eabara value = gift card
Consumed event from topic purchases: key = jbernard value = alarm clock
Consumed event from topic purchases: key = awalther value = t-shirts
Consumed event from topic purchases: key = htanaka value = alarm clock
Unhandled exception: I cannot handle jsmith!
Debug: [thrd:main]: GroupCoordinator/1: Committing offsets for 2 partition(s) with generation-id 93 in join-state steady: cgrp auto commit timer

If we re-execute the same code, the remaining messages will be fetched. But the jsmith message is skipped, and we are not able to re-fetch it (unless we reset the offsets, but let’s not talk about this now as this should be considered a last resort in any case).

Why?

Quick summary of the Kafka architecture

In Kafka, records are organized into channels known as Topics. Within each Topic, records are organized into Partitions, a construct used to enable parallel processing and scalability within a Topic. A Kafka record consists of a key and a message, but the terms record and message are often used interchangeably to refer to the whole record when talking about the Kafka architecture.

Source: https://medium.com/javarevisited/kafka-partitions-and-consumer-groups-in-6-mins-9e0e336c6c00

When produced, a record gets stored in a a partition and added to its commit log at a given offset. When a consumer subscribes to a topic it gets a set of partitions assigned (all of them in the case of only one consumer) and, in the default scenario where the offsets persist on the Kafka cluster, it starts reading records from the last committed offsets. Consumers are organized into Consumer Groups, which allow multiple consumers to work together, sharing the load of processing messages.

Source: https://newrelic.com/blog/best-practices/kafka-consumer-config-auto-commit-data-loss

During the second execution of the consumer process presented above, the jsmith record is not fetched again because the offsets persist on the cluster. In fact, in its default configuration, the consumer regularly pushes the offsets to the broker, even if the actual message processing is not yet complete.
More specifically in the case above, the offsets are committed during the consumer.Close() method call that is performed during the finally block invocation, right before the process exits.

The key settings driving offset management are the following:

enable.auto.commit — If true (default), periodically commit offset of the last message handed to the application. The committed offset will be used when the process restarts to pick up where it left off.

auto.commit.interval.ms — The frequency in milliseconds that the consumer offsets are committed (written) to offset storage.

enable.auto.offset.store — If true (default), the client will automatically store the offset+1 of the message just prior to passing the message to the application. The offset is stored in the memory and will be used by the next call to commit() (without explicit offsets specified) or the next auto commit. If false and enable.auto.commit=true, the application will manually have to call rd_kafka_offset_store() to store the offset to auto commit. (optional).

Depending on the nature of our application and the type of messages that we have to deal with, we can choose from different error handling strategies when the offset handling is automatic:

  • log the error and discard the message: It is the same as the setup presented above, with the difference that the try/catch is within the consumer loop. This strategy is very simple to implement, but the clear drawback is that the discarded message is ‘lost’.
  • dead letter topic: In this approach we can deal with non-transient errors, meaning messages that consistently trigger errors (like in our case coded above). These ‘faulty’ messages are sent to a dead letter topic and made available for subsequent processing, including manual if necessary.
    This strategy has the benefit of not losing the messages that trigger errors and allows the implementation of a recover/cleanup logic built around the dead letter topic. The main drawback is that the consumer process itself becomes a producer, and the implementation of a corresponding dead letters process, automatic or manual, should be designed in a way to prevent an ‘explosion’ of the number of dead letters.
  • retry topic and retry consumer: In addition to the dead letter topic, we can add the capability to also handle transient errors. In this case we may want to retry the processing of faulty messages, instead of immediately giving up. To do that, we send the record that triggered the error to a retry topic, which is then processed by a different consumer process (a retry consumer). The retry consumer fetches the message and, based on criteria like the number of past retries or the satisfaction of external preconditions, decides if the message should be republished to the target topic, if it should be further delayed, or if it should be discarded.
    The advantage of this approach is that the application can handle transient and non-transient errors; however, this solution cannot be used if the order of the messages is critical, and the complexity of such a solution is substantially higher than the previous two approaches. For a strategy that can preserve the order, see this Confluent article.

What if we take control of the offsets?

Having offset management handled internally by the client library is really convenient and guarantees a ‘at least once’ delivery, but, in case of errors, it may cause messages to either be missed or to be processed multiple times. This is entirely dependent on when the auto offset commit happens. In case of a consumer crash (or a disconnection — but we’ll talk about this later), the offsets persisting on the partitions are as old as the auto.commit.interval.ms time. If a new consumer process subscribes to the topic, it may potentially fetch messages that were already processed by the crashed consumer that failed to commit the latest offsets.

What we could do to prevent this is to disable the auto offset commit logic and explicitly decide when to commit the offsets to the broker. We do this via the configuration enable.auto.commit=false and explicitly call the consumer.Commit method after a message is processed.

Debug: [thrd:main]: GroupCoordinator/1: Fetch committed offsets for 2/2 partition(s)
Debug: [thrd:main]: Partition purchases [1] start fetching at offset 203 (leader epoch 0)
Debug: [thrd:main]: Partition purchases [0] start fetching at offset 97 (leader epoch 0)
Consumed event from topic purchases: key = eabara value = batteries
Debug: [thrd:main]: GroupCoordinator/1: Committing offsets for 1 partition(s) with generation-id 95 in join-state steady: manual
Consumed event from topic purchases: key = jbernard value = alarm clock
Debug: [thrd:main]: GroupCoordinator/1: Committing offsets for 1 partition(s) with generation-id 95 in join-state steady: manual
Consumed event from topic purchases: key = jsmith value = gift card
Debug: [thrd:main]: GroupCoordinator/1: Committing offsets for 1 partition(s) with generation-id 95 in join-state steady: manual
Consumed event from topic purchases: key = htanaka value = alarm clock
Debug: [thrd:main]: GroupCoordinator/1: Committing offsets for 1 partition(s) with generation-id 95 in join-state steady: manual
Consumed event from topic purchases: key = sgarcia value = book
Debug: [thrd:main]: GroupCoordinator/1: Committing offsets for 1 partition(s) with generation-id 95 in join-state steady: manual
Consumed event from topic purchases: key = htanaka value = gift card
Debug: [thrd:main]: GroupCoordinator/1: Committing offsets for 1 partition(s) with generation-id 95 in join-state steady: manual
Consumed event from topic purchases: key = sgarcia value = t-shirts
Debug: [thrd:main]: GroupCoordinator/1: Committing offsets for 1 partition(s) with generation-id 95 in join-state steady: manual
Consumed event from topic purchases: key = awalther value = alarm clock
Debug: [thrd:main]: GroupCoordinator/1: Committing offsets for 1 partition(s) with generation-id 95 in join-state steady: manual
Consumed event from topic purchases: key = sgarcia value = alarm clock
Debug: [thrd:main]: GroupCoordinator/1: Committing offsets for 1 partition(s) with generation-id 95 in join-state steady: manual
Consumed event from topic purchases: key = awalther value = batteries
Debug: [thrd:main]: GroupCoordinator/1: Committing offsets for 1 partition(s) with generation-id 95 in join-state steady: manual

The benefit of this approach is that we are committing the offsets only when a message is successfully processed (or, in case of errors, when the error is recognized and handled by our code) and we can ensure an exactly once policy. However, this verbosity can meaningfully impact the application’s performance.

Alternatively, if processing the same message more than once is not a problem, we can retain the auto offset commit and adjust the auto.commit.interval.ms to the best tradeoff between verbosity and the risk of processing messages twice.

Long-processing time issues

Reading a message from a Kafka topic is often only the first step of a long data processing algorithm. When it comes to consumer processes, one of the Kafka best practices recommends keepingthe ‘consumer loop’ fast, i.e. the time between the calls to consume.Consume should be short.

Why?

An obvious reason

The most obvious reason for minimizing a consumer loop is that the consumers would otherwise become a bottleneck, causing a potential cascade effect on the performances of the cluster and of the message producers (see backpressure).

When is a consumer considered disconnected?

Kafka consumers using client libraries based on librdkafka like the Confluent .Net Kafka one regularly send heartbeats to the broker to signal that they are still alive. The frequency of these heartbeats is configurable via the parameter heartbeat.interval.ms. If no heartbeat is received within a session.timeout.ms, then the consumer is considered disconnected and a rebalance is triggered.

BUT: Sending heartbeats is NOT enough to prevent a rebalancing! In fact, the consumer process also has another, less known parameter: max.poll.interval.ms.

Is the maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.

A less obvious reason

Let’s now consider the following scenario with two active consumers from the same consumer group. Initially, each consumer gets one assigned partition:

If one of the two consumers is detected as disconnected, a rebalancing operation is triggered and all the partitions get assigned to the other, still active consumer.

What if the process is alive, but busy on consuming a message?

In this case, despite the regular heartbeat sent by the consumer, if the max.poll.interval.ms timeout elapses then the consumer is considered disconnected and the rebalancing assigns the partitions to the other active consumers. Because the offset of the message being processed by the supposedly disconnected consumer might not be committed yet (due to the auto offset commit timeout not having elapsed; or due to our manual offset commit strategy), the same message would be processed again.

In order to mitigate this issue, developers should finetune the different consumer timeouts and reduce the duration of the consumer loop to the minimum. Other parameters that can be adjusted to increase the poll frequency are max-partition-fetch-bytes and fetch-max-bytes, which can put a limit on the number of records fetched by each consumer.Consume call (the lower the value, the higher the frequency).

Summary

We have seen how the default configuration of a Kafka consumer could lead to duplicate message processing. We have seen some best practices in terms of error handling, and presented the pros and cons of managing the offset commits manually. We have also analyzed some issues that could arise when the processing time for the fetched messages exceeds specific Kafka client timeouts.

References and credits

--

--