Source https://www.pilz.com/en-INT/support/knowhow/law-standards-norms/functional-safety/fault-tolerance

Rack awareness in Kafka Streams

Levani Kokhreidze
Wise Engineering
Published in
10 min readJun 7, 2022

--

Kafka 3.2 comes with many exciting features. One of the new features in the Kafka Streams is rack awareness which is the 2nd KIP (Kafka Improvement Proposal) contribution from Wise to the Apache Kafka project. This blog post explains how we run stateful Kafka Streams applications, what challenges we faced, and why we are looking forward to the rack awareness support in the Kafka Streams. Also, we will cover the new rack awareness implementation and explain how it works in more detail.

Please note that the rack-aware standby task assignment may change in future releases, so it is always good to check out the KIPs page for any relevant KIPs. There is no guarantee that information in this post will be applicable in the future, as we will go over intimate details of the implementation that were not part of the KIP-708 documentation.

Stream Processing at Wise

At Wise, we are heavily invested in stream processing. We run hundreds of stateful stream processing jobs of various sizes. We’ve developed our abstraction on top of Kafka Streams and Apache Flink, enabling us to switch between different stream processing frameworks with just a configuration change without modifying the user code. In addition to supporting two frameworks, it also has integration with the collection of streaming algorithms, like Apache Datasketches, which helps users build highly performant and complex real-time computations.

Even though we support two stream processing frameworks, most stream processing jobs use Kafka Streams-based engines. Kafka Streams is a critical component of Wise’s money movement, and therefore it being highly available is very important for the business.

Problem

To explain the problem, we have to go over a few basic concepts of the Kafka Streams and describe how we run stateful Kafka Streams-based jobs within our Kubernetes infrastructure at Wise.

First, let us go over how active and standby tasks are managed in Kafka Streams. When a Kafka Streams application subscribes to a topic, it creates a task. A task can process one or more topic partitions. The Stream Threads execute tasks, and one Stream Thread can execute one or more tasks.

Source https://kafka.apache.org/32/documentation/streams/architecture

In the case of stateful computation, like aggregations, counts, joins, and so on, each task may have a persisted state associated with it, and those tasks are referred to as stateful tasks. The state can be pretty small, but it can also be substantial in some cases. We have many stream processing jobs at Wise based on Kafka Streams that handle more than 1TB worth of state.

In Kafka Streams, persisted state is managed by RocksDB and backed up by corresponding changelog topics for fault tolerance. Usually, in production, tasks are distributed across multiple nodes so that each one of them handles a smaller chunk of the processing.

Source https://kafka.apache.org/32/documentation/streams/architecture

As we mentioned before, Kafka Streams usually run with multiple nodes in production-like environments. When a single node fails, tasks executing on the failed node must be migrated to the healthy nodes. Before resuming processing of the stateful tasks, Kafka Streams also has to recover the state of those tasks from the changelog topics. As you might have guessed, the bigger the state the task has, the longer it takes to migrate to the other nodes. Until the task migration process is complete, real-time event processing is halted, and we have a downtime of some partitions the failed node was responsible for. Kafka Streams have a num.standby.replicas configuration option to speed up the state recovery process by creating a “hot” standby task on a different node than the active one. Standby tasks are constantly replicating the state from the corresponding changelog topic and trying to keep the local state up-to-date with the active one. Standby tasks reduce the recovery time since Kafka Streams migrates the active task to the node where the standby task is located in case of a node failure. This means that the migrated task does not have to entirely recover the state from the changelog topic, reducing task failover time.

Standby to active task promotion

As mentioned before, at Wise, we run hundreds of stateful stream processing jobs based on the Kafka Streams engines, and to better understand the problem at hand, let us go over how we run them in our infrastructure.

How we run Kafka Streams based stream processing jobs at Wise

The illustration above shows that we run stream processing jobs as Statefulsets in self-managed Kubernetes clusters (k8s-cluster-1, k8s-cluster-2). Each pod runs with Persistent Volume(s) backed by an EBS disk(s). Each Kubernetes cluster deploys pods across three availability zones(az-a, az-b, az-c). If we have three pods per Kubernetes cluster, that would mean that we have two pods in each availability zone. The main issue was that standby task allocation before Kafka 3.2 was solely based on “least loaded” clients. “Least loaded” clients are determined by how many tasks the Kafka Streams client has already assigned. The standby task distributor in Kafka Streams would then assign the standby task to a client with the least number of tasks. So in our example, since we have two pods in each availability zone, and if we configure num.standby.replicas=1, it would make it possible for an active and standby task to end up in the same availability zone.

Active and standby task distribution prior Apache Kafka 3.2

Notice in the example above, that both active and standby tasks for 2_1 are located in the AZ-A. This setup was creating a problem in the cloud environment. If an availability zone fails, we will lose active and standby tasks on those pods, making the recovery process much longer, especially for large stateful applications. Users can configure num.standby.replicas to be more than the distinct count of Kafka Streams instances in each availability zone to mitigate this issue. In our case, we can configure num.standby.replicas=2, which would effectively mean that the 2nd standby task is allocated to another availability zone compared to the active one. Of course, this solution is costly as the storage costs triple.

Solution

With Apache Kafka 3.2, Kafka Streams users can now configure their applications with rack awareness. The idea is to give users the ability to tag their Kafka Streams instances with arbitrary tags corresponding to information where the Kafka Streams instance is running, for example, an availability zone. Kafka Streams will use this information to decide the most optimal allocation of standby tasks. This can be achieved with two extra configuration options, client.tag.* and rack.aware.assignment.tags, which let users customize rack-aware standby task assignment behavior based on their specific setup. Rack awareness works on a best-effort basis. Rack aware standby task algorithm computes the most optimal standby task distribution based on available Kafka Streams instances and their tags configuration. Let’s look at Wise’s example we discussed in the previous section and how we can apply rack awareness to it to get a more resilient setup.

client.tag.* lets users define arbitrary key-value tags for their Kafka Streams applications. Based on the Wise’s example, we can set availability zone and Kubernetes cluster as tags like so:

client.tag.zone=az-a
client.tag.cluster=k8s-cluster-1

rack.aware.assignment.tags lets users specify which client tags must be considered when computing rack-aware standby task distribution. In our example, we’d like to consider both zone and cluster tags. If we specify num.standby.replicas=1, Kafka Streams will ensure that the standby replica is created on the Kafka Streams instance, which has different zone and cluster tags than the active one. This is what we can call the ideal rack awareness — when all the standby tasks are located on different tags compared to the active and other standby tasks.

The ideal rack awareness in Apache Kafka 3.2

Before rack aware standby task assignment:

  • The active task 2_1 is located on k8s-cluster-2, az-a and the standby task was located on a client with ID c1-a (k8s-cluster-1, az-a).
  • The active task 2_0 is located on k8s-cluster-2, az-b and the standby task was located on a client with ID c2-c (k8s-cluster-2, az-c).
  • The active task 1_1 is located on k8s-cluster-2, az-c and the standby task was located on a client with ID c2-b (k8s-cluster-2, az-b).
  • The active task 2_2 is located on k8s-cluster-1, az-a and the standby task was located on a client with ID c1-c (k8s-cluster-1, az-c).
  • The active task 1_2 is located on k8s-cluster-1, az-b and the standby task was located on a client with ID c2-a (k8s-cluster-2, az-a).
  • The active task 1_0 is located on k8s-cluster-1, az-c and the standby task was located on a client with ID c1-b (k8s-cluster-1, az-b).

After rack aware standby task assignment:

  • The active task 2_1 is located on k8s-cluster-2, az-a and the standby task is located on a client with ID c1-b (k8s-cluster-1, az-b).
  • The active task 2_0 is located on k8s-cluster-2, az-b and the standby task is located on a client with ID c1-c (k8s-cluster-1, az-c).
  • The active task 1_1 is located on k8s-cluster-2, az-c and the standby task is located on a client with ID c1-a (k8s-cluster-1, az-a).
  • The active task 2_2 is located on k8s-cluster-1, az-a and the standby task is located on a client with ID c2-b (k8s-cluster-2, az-b).
  • The active task 1_2 is located on k8s-cluster-1, az-b and the standby task is located on a client with ID c2-c (k8s-cluster-2, az-c).
  • The active task 1_0 is located on k8s-cluster-1, az-c and the standby task is located on a client with ID c2-a (k8s-cluster-2, az-a).

New standby task allocation enables faster recovery in case of AZ failure and single Kubernetes cluster failure.

But what happens if we specify num.standby.replicas=2? In that case, the 2nd standby task won’t consider the cluster tag anymore since there aren’t enough cluster tag values available to satisfy this distribution. Still, it will only consider the zone tag since we have three distinct values for that tag. So 2nd standby will be allocated to a different availability zone than the active and 1st standby tasks. We can think of this as partial rack awareness — when not all the standby tasks are located on different tags compared to the active and other standby tasks.

The partial rack awareness in Apache Kafka 3.2 where num.standby.replicas=2

Since we have only two unique cluster tag values, we can only achieve “ideal” distribution on the 1st standby task assignment. Ideal distribution for the 1st standby task can be achieved because we can assign a standby task to the client located on a different cluster and zone than an active task. We can’t consider the cluster tag for the 2nd standby task assignment because the 1st standby task would already be assigned on a different cluster compared to the active one, which means we have already used all the available cluster tag values. Taking the cluster tag into consideration for the 2nd standby task assignment would effectively mean excluding all the clients. Instead, for the 2nd standby task, we can only achieve partial rack awareness based on the zone tag. As we don’t consider the cluster tag for the 2nd standby task assignment, partial rack awareness can be satisfied by placing the 2nd standby client on a different zone tag than the active and corresponding standby tasks. The zone on either cluster tags are valid candidates for the partial rack awareness, as our goal is to distribute clients on the different zone tags. We could achieve the ideal rack awareness by introducing the 3rd Kubernetes cluster.

The new rack aware standby task assigner may default to the “least loaded” standby task allocation if even “partial rack awareness” can’t be achieved. For example, if we only have two availability zones and num.standby.replicas=2, 2nd standby will default to the least loaded client because there are not enough zone tag values.

Least loaded standby task distribution in Apache Kafka 3.2

Summary

Standby replicas is a unique feature of Kafka Streams that enables us to recover from failures much quicker than most other stream processing frameworks. New rack-aware standby task distribution functionality which shipped with Apache Kafka 3.2 gives a flexible way of managing standby task allocation, even when running in Wise’s non-trivial setup. This is achieved by iteratively computing standby task distribution based on available Kafka Streams instances, their tags and desired number of standby replicas. It allows us to tolerate failures in multiple dimensions and improve availability of our systems.

Resources

--

--