Apache Kafka Guide #38 Consumer Internal Threads

Paul Ravvich
Apache Kafka At the Gates of Mastery
6 min readApr 9, 2024
Apache Kafka Guide #38 Consumer Internal Threads

Hi, this is Paul, and welcome to the #38 part of my Apache Kafka guide. Today we will discuss Consumer Internal Threads like Consumer Heartbeat and Consumer Poll mechanisms.

Consumer Internal Threads

Kafka explores the dynamics of running with a consumer group application, which includes three consumers within a Consumer Group. These consumers communicate with an entity known as a Consumer Group Coordinator, serving as an acting broker. This broker’s primary function is to monitor the status of the consumers to determine whether they are operational. Integral to this process are two mechanisms: the heartbeat and poll mechanisms. The heartbeat mechanism involves consumers periodically sending signals to the broker to affirm their active status, while the poll mechanism allows other brokers to recognize the consumers as active due to their ongoing requests for data from Apache Kafka. These mechanisms are crucial for identifying and addressing potential issues that may arise with the consumers.

Consumer Internal Threads like Consumer Heartbeat and Consumer Poll

The discussion delves into these mechanisms in detail, emphasizing the importance of processing data swiftly and polling frequently as opposed to doing so less often.

  • Consumers in the Group communicate with the Consumer Group Coordinator
  • To detect Consumers are “down” use the “heartbeat” and “poll” mechanism
  • Consumers are encouraged to process data fast and poll often

Consumer Heartbeat

Let’s delve into the concept of the consumer heartbeat thread. This thread is responsible for transmitting data to Kafka, specifically, periodic signals known as heartbeats, to indicate that the consumer is operational. The default frequency for these heartbeats is every three seconds. This interval is adjustable, providing a means to dictate the frequency of heartbeat transmissions. Common practice suggests setting this interval to one-third of the session.timeout.ms value. In Kafka version 3.0, the default setting for session.timeout.ms is 45 seconds, a significant increase from the previous default of 10 seconds. The purpose behind sending these heartbeats to the broker at regular intervals is to ensure that if a heartbeat fails to be sent within the designated timeout period, the consumer is then considered inactive or “dead”. To facilitate quicker consumer rebalancing, particularly in situations where consumers unexpectedly exit the group and cease heartbeat transmissions, one might opt for a lower session.timeout.ms setting. This heartbeat mechanism plays a crucial role in monitoring the status of consumer applications. For those seeking rapid group rebalancing following the termination of a consumer, configuring the heartbeat to one second and then session.timeout.ms to, say, 3 seconds could prove effective.

heartbeat.interval.ms (default 3 sec)

  • Frequency heartbeats
  • Traditionally set 1/3 of session.timeout.ms

session.timeout.ms (default 45 sec for Kafka 3.0+, earlier version is 10 sec)

  • Heartbeat is sent periodically from the Consumer to The Broker
  • When there is no heartbeat in this period the consumer is considered as “down”
  • For faster Consumers rebalance sets lower value.

This mechanism can be used to detect whether the Consumer app is inactive.

Consumer Poll

There are two primary mechanisms to consider. Firstly, we have the poll thread mechanism. This involves the setting max.poll.interval.ms, which defaults to five minutes. This interval represents the maximum allowed time between two polling actions before the system assumes the consumer is non-responsive. This setting is particularly crucial in big data frameworks, such as Spark, where data processing can be time-consuming. If processing exceeds five minutes, Kafka may consider the consumer inactive, potentially indicating a data processing bottleneck or the consumer getting stuck during processing. Therefore, adjusting this setting based on application speed is advisable; for swift applications, a shorter interval, such as 30 seconds, might be appropriate, whereas slower applications might require extending it to 15 minutes.

Additionally, the max.poll.records setting determines the number of records fetched in a single poll. The ideal number depends on the size of your messages and the processing capacity. For small messages, increasing this number might be beneficial, but for larger messages, reducing it could be necessary to avoid processing delays. It’s essential to assess the size of the records being pulled, their number per request, and the processing time they require to optimize performance effectively.

max.poll.interval.ms (default 5 min)

  • Maximum time range between 2 .poll() calls before we decide the Consumer is “down”.
  • Handy for Big Data frameworks like Spark when data processing takes time.
  • Used for detecting a data processing problem like a Consumer is “stuck”

max.poll.records (default 500)

  • How many records we can handle per .poll() request
  • It makes sense to increase value when you’re messages are small and you have a lot of RAM
  • Best for monitoring the amount of records polled in each request
  • Decrease when records processing is too slow

Consumer Poll Behaviour

The fetch.min.bytes setting, which is set to one by default, specifies the minimum amount of data you want to retrieve from Kafka with each request. Increasing this value can enhance throughput by reducing the number of requests, albeit at the expense of increased latency. This is because the configuration dictates that at least a megabyte of data must be acquired before any data is returned to the consumer; if this criterion is not met, the data is not needed. Additionally, the fetch.max.wait.ms parameter is set by default to half a second. This represents the longest duration the Kafka broker will delay responding to a fetch request if the data available does not meet the fetch.min.bytes requirement. Consequently, if fetch.min.bytes is configured to one megabyte, and this amount of data is unavailable, there will be a maximum latency of 500 milliseconds before the fetch request is fulfilled and returned to the consumer.

fetch.min.bytes (default 1)

  • How much data do you want to pull for each request
  • Help decrease request number and enhance throughput

fetch.max.wait.ms (default 500)

  • The Kafka broker has a maximum wait time that it will hold off on fulfilling a fetch request if the available data doesn’t immediately meet the fetch.min.bytes requirement. This timeout ensures that requests are not indefinitely delayed when data is insufficient.
  • This implies that until the fetch.min.bytes condition is fulfilled, there could be a latency of up to 500 milliseconds before the consumer receives the fetched data. This introduces a potential delay as a trade-off for increased efficiency in handling requests.

These settings offer the potential to significantly adjust consumer behavior to your preference. While the default configurations suffice for most situations, acquiring knowledge of these adjustments becomes beneficial as you delve deeper into the subject. Among these adjustments is the max.partition.fetch.bytes setting, which is set to one megabyte by default. This setting dictates the maximum data volume the server will deliver per partition. Consequently, if your operation involves reading from 100 partitions, a minimum of 100 megabytes of RAM will be necessary. Therefore, it’s advisable to modify this setting according to your specific requirements. Another important setting is fetch.max.bytes, which limits the total data returned with each fetch request. If sufficient memory is available, increasing this limit can enable your consumer to process more data per request.

max.partition.fetch.bytes (default 1Mb)

  • The maximum amount of data for each Partition the Server can return
  • When you have > 100 Partitions you need a huge amount of RAM

fetch.max.bytes (default 55Mb)

  • Maximum data will return per request
  • Increase fetch.max.bytes while you have available memory for increased amount of data for each request

These are considered advanced settings and should only be altered to enhance throughput if you’re reaching the limits of your consumer’s current capacity.

Thank you for reading until the end. Before you go:

Paul Ravvich

--

--

Paul Ravvich
Apache Kafka At the Gates of Mastery

Software Engineer with over 10 years of XP. Join me for tips on Programming, System Design, and productivity in tech! New articles every Tuesday and Thursday!