Solving My Weird Kafka Rebalancing Problems & Explaining What Is Happening and Why?

Benjamin Feldmann
bakdata
Published in
11 min readSep 22, 2020

--

Photo by Tyler Milligan on Unsplash

Imagine working on your Kafka Streams application. You deploy it to Kubernetes, wait a few hours, and suddenly… Huh… Kafka’s not processing any data anymore… but the apps didn’t crash, did they?

When we take a look at the brokers, they are working at full capacity. And all other Kafka applications seem to be processing a little less than before. Kafka is rebalancing!

What is Kafka Rebalancing?

To understand what rebalancing is, we need to understand how Kafka works. First, a few words on Kafka vocabulary.

A Kafka cluster consists of one or more brokers. A producer publishes data to Kafka brokers, and a consumer is an application that reads messages from a broker. A Kafka Streams application is both a consumer and a producer simultaneously, but we call it consumer for simplicity in this blog post. All transferred data is stored in a topic. Each topic is divided into one or multiple partitions. Only one consumer can read from a single partition at a time, and thus, the number of partitions of a topic is the maximum possible degree of parallelization.

A consumer group is a set of consumers that jointly consume messages from one or multiple Kafka topics. The leader of a group is a consumer that is additionally responsible for the partition assignment in a consumer group.

Rebalancing is just that — the process to map each partition to precisely one consumer. While Kafka is rebalancing, all involved consumers' processing is blocked (Incremental rebalancing aims to revoke only partitions that need to be transferred to other consumers, and thus, does not block every consumer — more on that below).

Partition Assignment

In more detail, all consumers revoke their partitions and sends a JoinGroup request to the group coordinator, which waits for a message from the consumers before transferring consumer information to the group leader(JoinGroupResponse). Then, every consumer sends a SyncGroup request to the leader, which is a request for a new partition assignment. The leader waits until it receives this message to calculate the next partition assignment, and sends the result to the broker, which broadcasts it to all members (SyncGroupResponse). There is a synchronization barrier between these two handshakes to allow the coordinator to distribute the partitions freely. This protocol is called eager rebalancing protocol.

Eager Rebalancing Protocol

There are multiple partition assignment strategies (e.g., round-robin), and you can even create your own. The default Kafka Streams strategy uses a sticky partition strategy that aims to create an even distribution and tries to minimize partition movements between two rebalancings. Note that there is currently no official lag-aware strategy.

How Does a Consumer “Consume”?

A consumer reads a message from a topic, processes it, and usually writes one or many messages to an output topic. The following consumer configurations, among others, are taken into account.

max.poll.records (default=500) defines the maximum number of messages that a consumer can poll at once.

max.partition.fetch.bytes (default=1048576) defines the maximum number of bytes that the server returns in a poll for a single partition.

max.poll.interval.ms (default=300000) defines the time a consumer has to process all messages from a poll and fetch a new poll afterward. If this interval is exceeded, the consumer leaves the consumer group.

heartbeat.interval.ms (default=3000) defines the frequency with which a consumer sends heartbeats.

session.timeout.ms (default=10000) defines the time a consumer has to send a heartbeat. If no heartbeat was received in that timeout, the member is considered dead and leaves the group.

The heartbeats are using a separate thread (since Kafka 0.10.1). The duration of consumer operations does not influence them.

What Triggers a Rebalancing?

Kafka starts a rebalancing if a consumer joins or leaves a group. Below are various reasons why that can or will happen. Please keep in mind that we run our Kafka on Kubernetes, so some reasons might not apply to your setup.

A consumer joins a group:

  • Application Start/Restart — If we deploy an application (or restart it), a new consumer joins the group
  • Application scale-up — We are creating new pods/application

A consumer leaves a group:

  • max.poll.interval.ms exceeded — polled records not processed in time
  • session.timeout.ms exceeded — no heartbeats sent, likely because of an application crash or a network error
  • Consumer shuts down
  • Pod relocation — Kubernetes relocates pods sometimes, e.g. if nodes are removed via kubectl drain or the cluster is scaled down. This feature is awesome, but the consumer shuts down (leaves the group) and is restarted again on another node (joins the group).
  • Application scale-down

Rebalancing is necessary for Kafka to work. It should not affect the application, but there are cases where rebalances have a huge impact. Thus, we want to reduce the number of unnecessary rebalancing.

Consumers Fail Because They Take Too Long

A common problem is that some applications have widely differing (sometimes by orders of magnitude) processing times. Especially when there are unexpectedly large records, i.e., almost all records are processed in a very short amount of time, but these very few take longer. These records exceed the max.poll.interval.ms, and would trigger a rebalancing.

One relatively easy improvement is to increase max.poll.interval.ms(and/or decrease max.poll.records/max.partition.fetch.bytes). We did not experience any backlash from changing these values, and multiple hours can be a suitable interval.

Allowing large processing times introduces another problem, though. As explained before, the leader creates a partition assignment and revokes partitions accordingly. A partition only gets revoked when the consumer finishes processing the current poll. Thus, the leader will wait until every consumer processes their current record.
You can imagine what will happen if one consumer processes a record that will take hours. — Right, the rebalancing will take hours as well.

Waiting for the last consumer while rebalancing

This is a serious problem, and we did not find a good solution for this. What we can do, however, is to reduce the number of rebalancings to reduce the probability that rebalancings occur while we process a time-consuming record.

Kafka Static Membership

Kafka static membership is a new feature introduced with Kafka 2.3. It allows the coordinator to persist member identities and to recognize restarted members. This is possible via the consumer configuration group.instance.id. If a consumer restarts for any reason, the group coordinator can assign the same partitions to the consumer without rebalancing everything. This process must happen within the session.timeout.ms limit's bounds because the member does not leave the consumer group until the timeout is exceeded. Otherwise, the consumer is considered dead, and consequently, rebalancing occurs.

No need for rebalancing with static membership enabled

It is recommended to increase the session timeout when using static membership (and only when). The value should fit your use case, and you should configure it as low as possible and as high as needed for pods to restart successfully. In our example, the consumer has a downtime but can rejoin the group inside the configured time limit's bounds. Thus, no rebalancing is needed.

We use our helm charts to deploy Kafka applications on Kubernetes, which allows us to activate static membership. Inside, we map the group.instance.id to the metadata name of our Kubernetes pod (If the pod restarts, the name does not change).

We are parsing the environment variable directly in our consumer and set the group.instance.id to the pod name in the Kafka Streams configuration to have a constant and unique group member id across container restarts. Additionally, it is possible to use Kubernetes Statefulsets instead of Kubernetes deployments to enable consistent pod identities and persistent state across pod (re)schedulings.

Incremental Cooperative Rebalancing

Since Kafka 2.4, all stream applications use the incremental cooperative rebalancing protocol to speed up every rebalancing. The idea is that a consumer does not need to revoke a partition if the group coordinator reassigns the same partition to the consumer again. Oversimplified, the old long-running rebalancing is split up into two rebalances:

  1. First, every consumer sends a JoinGroup request to the coordinator while not revoking their partitions. The group leader reassigns all partitions and removes every partition from the assignment that is to be transferred to another consumer. Afterward, a consumer only needs to revoke the partitions that are not part of its assignment anymore. These consumers then rejoin the group and trigger the second rebalancing.
  2. All revoked partitions (currently unassigned) are assigned to a consumer.
Cooperative Rebalancing Protocol

Remember the rebalancing chart describing the eager rebalancing protocol. The synchronization barrier was moved in between these two rebalancings. First, partitions are revoked and not assigned. Then, and after the synchronization, the revoked partitions are re-assigned.

Kafka 2.5 improved on this by allowing consumers to process data while in a rebalancing, which further improves the performance. Confluent showed in an article that the new protocol is a lot faster than the old one.

Nevertheless, this amazing feature does not completely provide a solution for our rebalancing problem with records with long processing times. It helps a lot if only a few partitions get transferred, but cannot work when many members join, and almost every old member has to revoke partitions.

What Happens When We Deploy N Applications at the Same Time?

When deploying applications, Kubernetes does not start every application exactly at the same time. There are a few milliseconds or seconds in between. As we explained previously, Kafka starts to rebalance as soon as a consumer joins the group. After the rebalancing, the coordinator notices that more consumer joined and starts another rebalancing. This cycle repeats until all consumers joined the group successfully. Normally, this process is really fast, and multiple rebalancings are not that much of a problem. In some cases (e.g., stateful streams), rebalancing is more expensive. If one of the already joined consumers processes a record with long processing time, this behavior disrupts the complete consumer group for that amount of time. One way to bypass this problem is to configure the group.initial.rebalance.delay.ms (default 3000 = 3 seconds) in the broker configurations. This delays the first rebalancing for x milliseconds, and thus, delays the processing of messages. If a consumer joins an empty group, the coordinator waits until the delay is over before rebalancing. From our experience, 120000 ms (2 minutes) is a value that ensures a smooth start-up of the applications while keeping the waiting times relatively low.

While this configuration fixes our problem, changing this setting should be avoided if not needed since this is a broker setting that affects the entire Kafka cluster.

Demo Application

We want to showcase the impact of unnecessary rebalancings in a small demo application. The idea is to try to force Kafka rebalancings by application crashes and restarts. Afterward, we want to show that these crashes do not necessarily invoke a rebalancing, given suitable configurations. We wrote an article about our deployments of NLP-pipelines in Kafka. While the demo is not NLP specific, this demo uses similar configurations.

Our Kafka demo setup is the following:

  • A Kafka cluster deployed with the confluent helm charts
  • A kafka-console-producer and a kafka-console-consumer
  • Three consumers that are processing text messages

Every consumer has the same behavior when processing messages:

  • When reading the message wait, the consumer waits for 15 minutes before writing the input message into the output topic
  • When reading the message crash, the consumer throws a RuntimeException and the Kubernetes pod restarts
  • Otherwise, the consumer forwards the input message into the output topic without doing anything else
Demo Setup

We are using our bakdata helm charts to deploy the three consumers.

First Experiment

Setup:

  • Create topics: input-topic and output-topic
  • Start non-static deployment
  • Start kafka-console-producer and kafka-console-consumer
  • Write random strings into the kafka-console-producer and see if the messages arrive at the kafka-console-consumer. (You will also see them in the logs of the Kubernetes pods)

Test:

  • Write wait into the kafka-console-producer. The message blocks one consumer for 15 minutes.
  • Spam some random messages to the kafka-console-producer. You should see that some of them do not arrive in the output topic because they are stuck in the blocked consumer.
  • Write crash into the kafka-console-producer. If no pod restarts, please wait a few seconds. It could be that the crash message was sent to the blocked consumer.
  • Spam some random messages to the kafka-console-producer.

At this point, no messages should arrive at the output because the consumer group is stuck in a rebalance until the blocked consumer finishes processing the wait message.

Second Experiment — Static Membership

Setup:

  • Delete old helm deployment
  • Delete topics: input-topic and output-topic
  • Create topics: input-topic and output-topic
  • Start non-static deployment
  • Start kafka-console-producer and kafka-console-consumer
  • Write random strings into the kafka-console-producer and see if the messages arrive at the kafka-console-consumer. (This may take a minute now - After this minute all messages will arrive immediately)

Test:

  • Write wait into the kafka-console-producer. One consumer is now blocked for 15 minutes.
  • Spam some random messages to the kafka-console-producer. You should see that some of them do not arrive in the output topic because they are stuck in the blocked consumer.
  • Write crash into the kafka-console-producer. If no pod restarts, please wait a few seconds. It could be that the crash message was sent to the blocked consumer.
  • Spam some random messages to the kafka-console-producer.

Since the consumer group is not rebalancing, the crashing consumer reads the crash message repeatedly and restarts multiple times. At this point, roughly a third of all messages should arrive in the output topic. One third arrives at the blocked consumer, and the other third arrives at the crash-looping consumer.

What We’ve Learned

Rebalancing is necessary for Kafka to work correctly. Still, we should avoid unnecessary rebalancing.

In our experience, it is best not to play too much with session.timeout.ms and heartbeat.interval.ms if not necessary. However, it is perfectly fine to increase max.poll.interval.ms or decrease the number of records via max.poll.records (or bytes via max.partition.fetch.bytes) in a poll.

Updating Kafka regularly is good practice. New updates may introduce improvement on the rebalancing protocol (we mentioned versions 2.3, 2.4, and 2.5 already in this article).

Use the Static Membership Protocol if you suffer from frequent rebalancing. In fact, we think this should be the default..

If you get stuck in rebalances while deploying multiple applications into an empty group (e.g., first time deploying, destroy everything and deploy again), you may consider configuring group.initial.rebalance.delay.ms.

We hope you enjoyed reading and now have a better understanding of Kafka rebalancing and how to solve problems with it.

Helpful Articles

In the end, I want to give my thanks to talks that elevated my research on this topic.

--

--