Fixing Kafka Streams Uneven Tasks Distribution at Logz.io

Matvey Mitnitsky
9 min readJun 5, 2024

--

Photo by Aleksey Kuprikov: https://www.pexels.com/photo/selective-focus-photography-of-balanced-rocks-3551254/

Intro — LogMetrics Feature

At Logz.io we provide an observability platform with the ability to ship logs, metrics, and traces and then interact with them using our app.

LogMetrics is an integral part of our observability offering, which bridges the gap between logs and metrics. It provides the seamless conversion of one type of signal to another. It empowers our customers to gain critical insights faster while also reducing their monitoring bill.

How does LogMetrics work?

Our users can create LogMetric definitions in a matter of seconds. The following is a typical flow for creating LogMetric rule:

  1. Define which logs should be converted by adding relevant Lucene query filters
  2. Configure a desired metric that should be created from this rule:
  3. Add groupBy fields in the filtered logs — these fields will be used as metric dimensions
  4. Choose an aggregation type: sum, avg, histogram, etc
  5. Finally, choose a target metrics account from the list of your sub-accounts

Once a new rule has been created we create rules inside our log ingestion pipeline that will route the relevant logs to the log-metrics Kafka topic.

At the heart of the system is the l2m-aggregator service, a Kafka Streams application responsible for aggregating logs data and producing aggregated results to the output topic.

Kafka Streams

This article assumes a basic familiarity with Kafka Streams and will not go into explaining it.

To learn more about Kafka Streams basics, read this.

Kafka Streams Processing Topology at Logz.io

Our team’s stream processing topology consists of three main parts:

  1. Pre-Aggregation (initial calculation) Horizontally Scalable Step
  2. Reduce Aggregation for Counter Metrics
  3. Reduce Aggregation for Gauge Metrics

Counters vs Gauges

Counter metrics are monotonically increasing numbers which are used in combination with increase & rate functions in PromQL (e.g., total_requests, total_transactions, etc).

A gauge is a metric that represents a single numerical value that can arbitrarily go up and down (e.g., avg cpu, disk space usage)

In our context gauges are aggregated across multiple logs over limited time window (e.g., avg_request_time, max error rate, etc) over 1 min

Sub-topologies 0 and 2 are window aggregations — tumbling one minute windows with a 20 minute grace period.

Sub-topology 1 (counters) is a plain aggregation that accumulates value over time and doesn’t reset it.

Operational Noise and Increased On-Call Alerts

Recently our team started experiencing an elevated amount of alerts for latency in the l2m-incoming topic.

Ingesting data points with an old timestamps is currently limited to a small time window. Therefore any latency higher than 10 minutes is considered risky and generates an alert.

The alerts for high L2M latency were triggered multiple times a week both during regular and off-hours. Most of the time, the issue was resolved by our regular scaling policy. However, it became clear that something was wrong and this part of the system was not stable.

Symptoms

Symptoms that led to degraded performance included:

  • Partition Lag (accumulating lag on one or multiple partitions)
  • Uneven CPU Load (some pods working harder than others)
  • Memory Pressure (increased GC rate and GC duration)

Below you can see examples of these cases, as seen on the dashboards:

l2m topic lag
lag by topic partition
pods cpu usage is heavily skewed

We’ve also noticed that this issue occurred only in our busiest region while other regions were quiet.

Investigation

After better understanding contributing factors and having observed system behavior during a couple of incidents, we compiled a list of our suspects and started investigation:

  • Data Skew in the incoming topic
  • Spiking Number of aggregation keys that lead to increased memory usage
  • Uneven Tasks Distribution

First two suspects on the list were discarded quickly using the observability we had built. Data was distributed evenly between partitions and memory spikes looked more like a symptom and not a disease.

Therefore we decided to focus on investigating the tasks distribution.

Investigating Uneven Tasks Assignment in Kafka Streams

Task is the smallest unit of work within a Kafka Streams application instance. The number of tasks is driven by the number of input partitions. For example, if you have a Kafka Streams application that only subscribes to one topic, and that topic has six partitions, your Kafka Streams application would have six tasks.

Image source: https://kafka.apache.org/0110/documentation/streams/architecture

A task can only be assigned to a single pod and single Stream Thread. But a pod or thread can be assigned multiple tasks. Thus Kafka Streams Task and Kafka Streams Pod have a 1-to-Many relationship. Kafka Streams users cannot control tasks assignment since it’s the responsibility of the library.

Before the incident we didn’t have a way to visualize tasks assignment per pod because we had assumed that it was balanced. Although not all tasks in our topology have equal weight, our understanding was that the framework does the best effort of spreading tasks equally across the pods. It was time to enhance our observability into this area of our system.

So we started looking for metrics that would shed light on this aspect of our stream processing pipeline.

Augmenting our Observability into Kafka Streams

Unfortunately, Kafka Streams doesn’t have a dedicated metric that tracks task assignment.
Luckily, most of the metrics have task_id labels that can be aggregated using “count by” PromQL query.

For instance, the following query will return the process_rate metric for task 1 in subtopology 0 (thus 0_1)

process_rate{domainName=”kafka.streams”, task_id=”0_1” }

And the following topk query will return top 10 pods by number of tasks in sub-topology 0.

topk(10, count by (podName) (process_rate{appName=”l2m-aggregator”, task_id=~”0_.*”}))

It’s also useful to filter by processor name in your topology using the processor_node_id label.

With these metrics available we’ve built two new visualizations:

  1. Total number of tasks per pod
  2. Subtopology 0 distribution

Subtopology 0 is the heaviest part of our Stream Processing, therefore it deserves its own panel.

The panels above gave us the observability we needed and provided clear evidence of uneven tasks distribution. Some pods have more than 10 tasks while others have only one or two.

Additionally, we were able to correlate partition lag and changes in the task distribution (as shown in the screenshots below). We discovered that our initial assumption of even task distribution was wrong and this took us to the next stage — figuring out why tasks assignment is not even.

partition lag starting at 11:50 at the same time with CPU increase and number of tasks change

Root Cause Analysis

So, we honed in on the “primary suspect” and proved that task distribution is to blame. Still we’ve left with some unanswered questions:

  • Why Kafka Streams doesn’t distribute tasks evenly?
  • Why does this problem happen only in our busiest region?
  • How do we fix it?

To answer this question we had, in a sense, to go back to the drawing board and understand better how Kafka Streams behaves at scale.

This led us to reading two important KIPs (Kafka Improvement Proposals) that focused on dealing with Rebalance Storms during deployment, scale out, and scale in:

  1. KIP-429: “Kafka Consumer Incremental Rebalance Protocol” this KIP introduced rebalancing time improvement by making tasks assignment more sticky. This greatly reduced the time it took to rebalance but it caused an “unfortunate and uneven distribution of tasks”.
  2. KIP-441: “Smooth Scaling Out for Kafka Streams was introduced in Apache Kafka 2.6 to improve unbalanced task distribution by trying to reassign tasks to other pods when the warmup replica is up-to date (meaning there’s no lag and we can switch task owner seamlessly).

The last KIP introduced 3 new configuration knobs to play with

  1. max.warmup.replicas (default 2)
  2. acceptable.recovery.lag (default 10,000)
  3. probing.rebalance.interval.ms (default 10 minutes)

The meaning of these configs is as follows:

  • max.warmup.replicas — is the number of “warmup” tasks that will try to restore the state on other instances after deployment. This is similar to a standby task but unlike standby, the lifecycle of a warmup task is limited by restoration time. Once restoration is over the warmup task is replaced by a regular task.
  • acceptable.recovery.lag — is the maximum number of unprocessed records that can be tolerated on a warmup replica that is going to be promoted to be a task owner. If the lag is lower, we can switch the ownership and wait for Kafka Streams to restore the state. If the lag is higher we cannot switch the ownership and should wait for a warmup replica to catch up.
  • probing.rebalance.interval.ms — Kafka Streams introduced an additional mechanism to re-assign tasks in absence of rebalances. If we restart the application and go with an unbalanced distribution first, who will reassign the tasks later when they are ready? This mechanism ensures that task reassignment attempts continue after deployment with a stable interval.

Our theory was that we experience an uneven tasks distribution because default acceptable.recovery.lag config is too low for our load and default probing.rebalance.interval.ms is too high.

The default configs are not good enough for our application because we had a background lag of a couple of millions of logs, therefore warmup replicas will always chase the lag and would never be promoted to be an owner.

Applying the fix and fine tuning cluster configuration

With all the pieces finally put together, it was time to provide a fix. We quickly added configuration options to our code and deployed in prod.

We changed configurations and waited to see results in our beautiful dashboards:

acceptable.recovery.lag = 1000000
probing.rebalance.interval.ms = 300000 // 5 min

During deployment tasks distribution skew is very high. First time it took us approximately 1 hour to arrive at a balanced distribution.

Once all is done, the number of tasks and total CPU usage equalizes and becomes very stable until the next deployment or scale out.

Notice how CPU usage correlates with task distribution metric.

We tuned smooth scale out behavior further by increasing max.warmup.replicas to 10 and acceptable.recovery.lag to 10 Mil. It reduced total time it takes to reach balanced state from 1–2 hours to 10–20 minutes (a couple of probing rebalance intervals)

⚠️ One should be careful with increasing max.warmup.replicas config, according to the documentation:

“Increasing this will allow Streams to warm up more tasks at once, speeding up the time for the reassigned warmups to restore sufficient state for them to be transitioned to active tasks.”

Setting this configuration too high might create additional pressure on brokers due to extra broker traffic and cluster state used during rebalances.

Fine tuning only three configurations improved the situation drastically and the results were amazing: balanced tasks distribution and CPU utilization, latency below one minute, and no more alerts!

Our team can now focus their energy on delivering features and not spending time on fixing prod.

There are more knobs to play with for speeding up data restoration:

  1. restore.consumer.max.poll.records — fetching more records during restore
  2. introducing k8s PVC to our deployments to keep data between restarts

Though this is a topic for a separate blog.

Conclusions

I want to summarize a couple of valuable lessons that we learned while fixing this issue.

First and foremost, Kafka Streams behavior might be counterintuitive especially at high scale. Don’t assume that tasks distribution provided by the framework is the best option it can be.

Secondly, the defaults provided by Kafka Streams library work well at smaller scales. In our case the issue only happened in our busiest region and tweaking this configuration helped to solve it.

Finally, out of the box metrics provided by Kafka Streams are not always enough and it’s important to enrich and augment them. Good observability was the key to finding out the root cause and will continue to play a crucial role in making sure it will not happen again in the future.

Fine tuning our stream processing pipeline improved our understanding of the framework and allowed us to drastically reduce operational noise. I hope it helped you to understand Kafka Streams internals better as well.

--

--

Matvey Mitnitsky

Backend Engineer, Ingestion Tech Lead @ Logz.io | Passionate about Software Engineering, Data, Stream Processing. I enjoy Coffee, Hiking, and watching Movies