Kafka ins and outs: Part 3

Consumers, Parallelism and Delivery Semantics

Andrews Azevedo dos Reis
WAES
10 min readJan 11, 2024

--

In the first part of our series, Kafka ins and outs: Part 1, we introduced you to Apache Kafka, delving into its core capabilities, topics, partitions, and replication. The second part, Kafka ins and outs: Part 2, focused on Kafka producers, shedding light on how data is published onto Kafka topics.

Now, let's focus on the other critical component of Kafka's ecosystem: the Consumers.

What is a Consumer?

A Kafka Consumer plays a pivotal role in the data pipeline. It reads data from the topics where producers have pushed messages. Think of it as the counterpart to a producer; while the producer sends data to Kafka, the consumer retrieves and processes it.

The Consumer representation in the traditional Pub-Sub architecture

Consumer Groups and Parallelism

Kafka's real power shines in its handling of parallelism through Consumer Groups. A Consumer Group is a cluster of consumers working together to process data from a shared set of topics. Each Consumer within the group reads from an exclusive set of partitions, ensuring efficient data processing and scalability.

A Consumer Group containing 3 Consumers, each one reading a specific Partition of a Topic

But what if you have just two consumers without a specified consumer group? In that case, each Consumer acts independently, potentially leading to data duplication or gaps in data processing, as there's no coordination in their consumption patterns.

Two Consumers reading the same Partitions of a Topic

This is also how we can distribute data from the same Topic to different applications. Imagine two different applications are interested in the Topic Awsome Players. It's simple: each application needs to create its own consumer group.

Two different Consumer Groups reading data from the same Topic

It's also important to understand the limitations of this parallelism. For instance, consider a topic with three partitions and six consumers in the same consumer group. In this scenario, three consumers will be idle because Kafka assigns only one Consumer per partition within a group.

Consumer Group with six Consumers reading a Topic with only three Partitions - three Consumers are idle

This means the maximum effective parallelism is limited to the number of partitions in the Topic. Thus, having more consumers than partitions in a consumer group doesn't increase consumption efficiency.

Delivery Semantics

In Kafka, the term delivery semantics refers to the guarantees provided by the system regarding message delivery in the presence of failures. It defines how reliably messages are delivered from producers to consumers and the trade-offs between reliability and performance.

There are some properties responsible for controlling this behaviour:

  1. max.pool.records: configures the maximum number of records a consumer can fetch in a single poll, allowing precise control over the amount of data processed in each iteration, which is especially important for managing memory usage and processing time in consumer applications. The default value is 500.
  2. enable.auto.commit: determines whether the Consumer automatically commits offsets of messages it has processed. The default value is TRUE.
  3. auto.commit.interval.ms: specifies the frequency, in milliseconds, at which the Consumer automatically commits the offsets if the property enable.auto.commit is set to true, balancing between offset commit granularity and performance overhead to ensure efficient message processing. The default value is 5 seconds.

In summary, they define the number of records processed in one iteration, aka a consumer batch; also, if the Consumer will auto commit and the interval of time for this auto commit.

Now, let's dive deep into the three types of delivery semantics that Kafka offers.

At Least Once

This ensures that messages are never lost by delivering them at least once. However, it may lead to duplicate message delivery. It's ideal for scenarios where losing data is not an option, but processing the same data more than once is acceptable.

We can achieve this behaviour with two different combinations:

First, setting the property enable.auto.commit to false and make the Consumer commit offsets manually. For instance, take a look at this error scenario:

Error scenario

If enable.auto.commit is FALSE:

1. Consumer fetches 500 messages (offset 0 …. 499)
2. Consumer processes from offset 0 to 299
3. Consumer commits offset using commitSync() or commitAsync()
4. Consumer starts processing offset 300
5. Consumer processes offset 300
6. Consumer tries to commit offset using commitSync() and crashes
7. Consumer restart
8. Consumer fetches 500 messages (offset 300 …. 799)
9. Consumer reprocess offset from 300

Here, we have some duplication

Second, setting the property enable.auto.commit to true, and setting the property auto.commit.interval.ms to a high value. For instance, take a look at this error scenario:

Error scenario

The enable.auto.commit is TRUE (kafka's default value), and the Consumer
has a high auto.commit.interval.ms = 5 sec (kafka's default value):

1. Consumer fetches 500 messages (offset 0 …. 499)
2. Consumer processes from offset 0 to 299 in 3 sec
3. Consumer starts processing offset 300 (after 3 sec, before 5 sec)
4. Consumer crashes processing offset 300 (after 3 sec, before 5 sec)
5. Consumer did not COMMIT any offset
6. Consumer restart
7. Consumer fetches 500 messages (offset 0 …. 499) again
8. Consumer reprocess offset from 0 to 299

Here, we have some duplication

In summary, using at least once, we prioritize data reliability over performance, ensuring we never lose a record, even if we have some duplications. The Consumer should be idempotent to use this option and avoid duplicate messages.

At Most Once

Messages are delivered at most once, meaning they may be lost but never duplicated. This is suitable for cases where processing messages more than once is more problematic than missing a message, such as in systems where duplications could lead to repeated erroneous actions.

We can achieve this behaviour by setting the property enable.auto.commit to true and setting the property auto.commit.interval.ms to a low value. For instance, take a look at this error scenario:

Error scenario
If enable.auto.commit is TRUE (kafka's default value), and the Consumer
has a high auto.commit.interval.ms = 1 sec :

1. Consumer fetches 500 messages (offset 0 …. 499)
2. Consumer processes from offset 0 to 100 in 0,9 sec
3. Consumer starts processing offset 101 (after 0,9 before 1 sec)
4. After 1 sec, the Consumer starts to COMMIT offset from 0 to 101
5. Consumer crashes processing offset 101 before complete
6. Consumer restart
7. Consumer fetches 500 messages (offset 102 …. 602)
8. Consumer does not process offset 101

Here, we lost a message

Exactly Once

This is the most stringent and complex, ensuring each message is delivered and processed exactly once, eliminating data loss or duplication risks. It's the most desirable for systems where both the loss and duplication of messages could lead to significant issues, such as financial transactions.

To achieve this, we need to use Kafka’s Transactional API between Consumer and Producer. This can be very complex, and we can have a whole article only to explain this, so let’s skip the details on that for now.

That's it! Each of these delivery semantics offers a balance between data consistency, system reliability, and processing overhead.

By default, Kafka uses at least once delivery, prioritizing data reliability while also allowing configurations for other semantics based on specific use case requirements.

The property auto.offset.reset

In Kafka, the auto.offset.reset property plays a crucial role in managing consumer behaviour, mainly when dealing with missing or undefined offsets - a scenario often encountered when a consumer starts consuming data or if an application crashes and restarts.

This property dictates the Consumer's action in such situations and has three settings:

Earliest

The Consumer starts reading from the earliest message available in the partition. This setting is vital for applications that process the complete data history, such as in a data synchronization tool that needs to rebuild its state after a crash.

For instance, imagine a data analytics application aggregating historical data for trend analysis. If this application crashes and restarts, setting auto.offset.reset to earliest ensure the Consumer begins processing from the earliest record, thereby keeping all historical data.

To illustrate, imagine we have this Consumer down and this Topic:

Oh-oh, we don't have the offset 3 anymore. We don't even have the offsets 4, 5 and 6. This could cause us trouble! When we start our Consumer using the earliest option, this happens:

Kafka makes our Consumer point to the earliest offset available, in this case, the record with the offset 7. From this point, our Consumer will process all records available on the Topic.

Latest

The Consumer begins consuming data from the latest message onward. This is the default setting and is practical for applications focused on real-time data processing, where only the most recent messages are relevant. This is Kafka's default value.

For instance, consider a real-time stock trading dashboard that displays the latest stock prices. If it restarts after a crash, setting auto.offset.reset to latest ensures it skips historical data and starts displaying the latest stock market information immediately.

To illustrate, imagine we have this Consumer down and this Topic:

Same situation here! We don't have the offset 3 anymore. We don't even have the offsets 4, 5 and 6. This could cause us trouble! When we start our Consumer using the latest option, this happens:

Kafka makes our Consumer point to the latest offset available, in this case, the record with the offset 12. But note that our Consumer will only process records after the offset 12. What happens now is our Consumer will process all new records that arrive in the Topic.

None

If no previous offset is found for the Consumer's group, the Consumer throws an exception and stops. This setting is useful for scenarios where the Consumer should not start if there is any uncertainty about where to begin, ensuring strict control over data consumption.

For instance, in a financial transaction monitoring system, ensuring data integrity and the sequence of transactions is paramount. Using the none setting ensures that, in case of any offset ambiguity, the system halts, prompting an investigation rather than processing potentially incorrect data.

Again, to illustrate, imagine we have this Consumer down and this Topic:

And again, we don't have the offset 3 anymore. We don't even have the offsets 4, 5 and 6. When we start our Consumer using the none option, this happens:

After starting our Consumer, exceptions will be thrown. This can sound like a problem, but if we want control over everything else, this can be useful.

Finally, each setting serves a specific purpose and can be crucial depending on the use case and the nature of the application.

By choosing the right auto.offset.reset behaviour, developers can better manage how their applications respond in cases of missing offsets, such as during initial startup or after an unexpected shutdown.

Distributed Coordination Service

In a distributed system like Kafka, components such as brokers, producers, and consumers are spread across different servers or nodes. A distributed coordination service is responsible for managing and orchestrating these components.

It ensures that they work together harmoniously, handling tasks like configuration management, synchronization, leader election, and group membership. This service is crucial for maintaining the stability and reliability of the distributed system.

Apache Zookeeper was responsible for this at the beginning of Kafka. Since Kafka version 3.3, Kafka KRaft was officially released and took this responsibility.

  • Apache Zookeeper: Traditionally, Kafka has used Zookeeper as its distributed coordination service. Zookeeper manages Kafka's cluster metadata and coordinates broker states. It plays a vital role in tasks like leader election for partitions and keeping track of which brokers are live and part of the cluster.
  • Kafka Raft (KRaft): Kafka is transitioning towards a Zookeeper-less setup with the introduction of KRaft. This shift aims to simplify Kafka's architecture and improve its scalability and performance. KRaft takes over the role of distributed coordination internally, eliminating the need for an external service like Zookeeper.

This transition marks a significant evolution in Kafka's architecture, enhancing its efficiency and ease of use in managing distributed data systems.

Conclusion

And that's a wrap on our Kafka journey! We've traversed Topics from the basics to the nuances of Producers and Consumers. This three-article series can contribute to you and help you prepare to dive into this fantastic tool. As Kafka continues to evolve, so will how we interact with real-time data. Happy data streaming!

Do you think you have what it takes to be one of us?

At WAES, we are always looking for the best developers and data engineers to help Dutch companies succeed. If you are interested in becoming a part of our team and moving to The Netherlands, look at our open positions here.

WAES publication

Our content creators constantly create new articles about software development, lifestyle, and WAES. So make sure to follow us on Medium to learn more.

Also, make sure to follow us on our social media:
LinkedInInstagramTwitterYouTube

--

--