Kafka Streams work allocation

Kafka Streams provides an extremely simple deployment model for scaling out your streaming applications. You deploy your application many times and the instances will automatically share the work amongst themselves. The Kafka Streams documentation provides a high level description of how this process works, but leaves many unanswered questions. From the Confluent Kafka Streams docs:

An application’s processor topology is scaled by breaking it into multiple stream tasks. More specifically, Kafka Streams creates a fixed number of stream tasks based on the input stream partitions for the application, with each task being assigned a list of partitions from the input streams (i.e., Kafka topics). The assignment of stream partitions to stream tasks never changes, hence the stream task is a fixed unit of parallelism of the application. Tasks can then instantiate their own processor topology based on the assigned partitions; they also maintain a buffer for each of its assigned partitions and process input data one-record-at-a-time from these record buffers. As a result stream tasks can be processed independently and in parallel without manual intervention.
Slightly simplified, the maximum parallelism at which your application may run is bounded by the maximum number of stream tasks, which itself is determined by maximum number of partitions of the input topic(s) the application is reading from. For example, if your input topic has 5 partitions, then you can run up to 5 applications instances. These instances will collaboratively process the topic’s data. If you run a larger number of app instances than partitions of the input topic, the “excess” app instances will launch but remain idle; however, if one of the busy instances goes down, one of the idle instances will resume the former’s work. We provide a more detailed explanation and example in the FAQ.

Breaking your topology down into sub-topologies

For the simplest of applications, like the one below, we read from one topic, apply some transformations and write to another topic. In this case,
Kafka Streams will create exactly as many tasks are there are partitions on the source topic, as explained in the quote above.

However for more complex Kafka Stream applications, the topology can be further broken down into sub-topologies. The idea of a sub-topology is that it can be run entirely independently from any other sub-topology. Sub-topologies cannot share any source topics since in Kafka Streams each source message is processed only once, ignoring failure scenarios.

In the example topology above we use the through combinator to write the results of the flatMap operation back to a Kafka topic. We then read from this topic, filter out some of the records and write it to a separate topic. This topology can be broken up into two separate sub-topologies. One to read from the orders topic, flatMap then write to the order-items topic. The other to read from the order-items topic filter then write back to the active-order-items topic. Each operation or action in a sub-topology is represented internally as a ProcessorNode. For instance there are separate ProcessorNode implementations for reading from a topic, the flatMap operation, writing to a topic etc. A sub-topology can be represented as a directed acyclic graph (DAG) of processor nodes.

Consider the following abstract example of a more complex topology.

In this example we do a number of operations like joining and branching streams, we write to a number of intermediate topics and two destination topics. The Kafka Streams API examines this topology and determines it can break it up into the follow 3 sub-topologies:

Read from both input topics, perform a join and a branch and write to the four intermediate topics.
Read from two of the intermediate topics, filter on one, join them together and write to an output topic.
Read from two of the intermediate topics, join them together and write to an output topic.

Each of the three sub-topologies above is responsible for its own source topics. Records from those source topics will be processed only by that sub-topology and will not be seen by any other sub-topology.

Describing your Kafka Streams application topology

Kafka Streams makes it very simple to see how your Kafka Stream application has been broken down into sub-topologies. You simply need to describe your topology

This will dump out a description of your topology to your log that will look something like this…

Sub-topology: 0
Source: KSTREAM-SOURCE-0000000242 (topics: [orders-repartition])
--> KSTREAM-AGGREGATE-0000000239
Processor: KSTREAM-AGGREGATE-0000000239 (stores: [ordersByAggregateId])
--> KTABLE-TOSTREAM-0000000243
<-- KSTREAM-SOURCE-0000000242
Processor: KTABLE-TOSTREAM-0000000243 (stores: [])
--> KSTREAM-SINK-0000000244
<-- KSTREAM-AGGREGATE-0000000239
Sink: KSTREAM-SINK-0000000244 (topic: ordersByOrderId)
<-- KTABLE-TOSTREAM-0000000243

Sub-topology: 1
Source: KSTREAM-SOURCE-0000000254 (topics: [attributes-repartition])
--> KSTREAM-REDUCE-0000000251
...

Joshua Koo has built a fantastic web tool that takes this topology description as input and makes these wonderful hand-drawn-like diagrams visually representing each of the sub-topologies. This can be extremely useful when you’ve identified an enormous sub-topology that needs breaking up.

Generating tasks from sub-topologies

Generating sub-topologies from your overall topology is a static process that is performed at the time your topology is built and prior to connecting with Kafka. When your Kafka Streams application is started, the streams API will fetch metadata on each of the source topics from Kafka to determine how to break each sub-topology down into tasks. A task is the smallest unit of work that can be performed in parallel in a Kafka Streams application.

If more than one source topic is allocated to a sub-topology, it means those topics are joined together at some point in that topology. For any stream-to-stream, stream-to-table or table-to-table join the source topics must be co-partitioned. Co-partitioned topics share the same key, and have the same number of partitions. This is so when joining, matching records will always be in the same partition number in both topics and will be owned by the same task. If the number of partitions is different, Kafka Streams will fail at this point with a TopologyBuilderException.

GlobalKTable topics are treated as a special case. Joins with a GlobalKTable do not require co-partitioning. This is because the full state of that topic is made available on every application instance, so the matching data will always be available locally. GlobalKTables are managed by a separate GlobalState thread on each application instance and are not considered to be owned by any of the sub-topologies.


Once the partition counts are verified to be the same, the API is now ready to generate tasks for this sub-topology — one per partition.

In the example above, for sub-topology 2, each source topic has 5 partitions. Here, partition 1 for each of these topics is assigned to task 2_1. This naming rule is generalisable and very useful when debugging stream applications. Task names are of the form

<sub-topology-number>_<partition_number>

So for this sub-topology, we will end up generating five tasks. For the full topology, given the source topics had four partitions and the other two intermediate topics had two partitions, we end up with the following tasks:

Assigning tasks to running application instances

The Kafka Streams configuration entry StreamsConfig.NUM_STREAM_THREADS_CONFIG indicates how many stream threads to generate for this application instance. A StreamThread is an actual Java thread with its own unique Kafka consumer and producer instances. Each StreamThread is assigned tasks to perform. A Kafka Streams application's capacity is defined by the total number of StreamThread instances available over all running instances of the application. At startup, and whenever the set of available application instances changes, tasks are assigned out as evenly and fairly as possible to all the available StreamThreads in the cluster.

Kafka Streams uses the consumer group API to manage storing consumer offsets for all source topics. Kafka Streams also piggybacks off the same mechanism to assign tasks to all StreamThreads on those application instances.

At startup, this is roughly the order of events the Kafka Stream application performs

  1. The Topology is built up using the StreamsBuilder DSL
  2. A KafkaStreams instance is created using the Topology and stream configuration
  3. KafkaStreams creates StreamsConfig.NUM_STREAM_THREADS_CONFIG instances of StreamThread each with its own KafkaConsumer configured with the application id as the consumer group id.
  4. The KafkaStreamsinstance is started triggering all StreamThreads to start.
  5. Each StreamThread then uses its own KafkaConsumer to subscribe to all source topics.
  6. When using consumer groups. Kafka uses a PartitionAssignor implementation to determine what topic partitions are assigned to individual consumers within a consumer group. The Kafka Streams implementation of this interface, StreamPartitionAssignor is aware of stream tasks. The consumer group protocol uses a Kafka broker as the coordinator, gathering metadata from each consumer and picking a consumer at random to do the assignment. The broker passes all metadata to that consumer instance. For Kafka Streams, this metadata includes whether each instance has local state stores for any of the tasks. The consumer attempts to allocate the tasks evenly amongst all available stream threads in the cluster preferring to have tasks run on application instances that already have state stores for these tasks. Each task has a unique set of topic partitions it reads from. So for each stream thread, that corresponds to a consumer, it is allocated all the topic partitions used by the tasks it is assigned.

Given the example topology above, here’s an example task allocation given two application instances, one configured with three StreamThreads, the other with two.

When assignment occurs, Kafka Streams logs the currently allocated tasks for each local StreamThread.

2018-05-10 07:09:38,358 INFO o.a.k.s.p.i.StreamThread - stream-thread [mySampleApp-7b9fcf1e-5f6c-44fc-b3fa-f5b1950300fc-StreamThread-3] partition assignment took 134 ms."}
current active tasks: [0_2, 2_1]
current standby tasks: []
previous active tasks: []

Task assignment stays the same for as long as the application instances are stable. If an instance is stopped or started, the partition assignment process starts again. You can use the kafka-consumer-groups command line tool to determine what partitions are assigned to what threads at any time.

docker run --rm confluentinc/cp-kafka:latest kafka-consumer-groups \
--bootstrap-server broker:9092 \

--describe --group mySampleApp

Imbalanced work allocation

The work allocation mechanism generally works well but has a couple of flaws. Not all tasks are created equal. A task can do anything from reading from a topic and counting messages right through to a full topology of 50 processor nodes and multiple state stores. No concept of task workload is currently taken into account in the assignment. Additionally slow tasks may end up lagging fast tasks. Consumer offset lag is currently not taken into account during allocation. Finally task allocation is only done on the rare occasion of an application instance restart. While running, if a StreamThread has caught up on all its source topics, it will sit there idle. Whereas another StreamThread that has been assigned two slow tasks may be at 100% on a separate CPU, struggling to keep up. Currently, if you see an imbalanced workload you have a couple of options:

  • Restart the application instances — Restarting will force a reallocation of tasks and hopefully will distribute out your slow tasks to separate stream threads so they can be processed concurrently. If you delete the local state stores while the instances are down, you will get a better redistribution of tasks, since it won’t favour sending tasks to the same instances they were running before, but you will need to wait for the state stores to be repopulated from the changelog topics in Kafka which may take some time depending on their size.
  • Increase the number of Stream Threads — The more stream threads you have, the fewer tasks assigned to each stream thread and therefore the less contention for CPU. You could even go for more Stream Threads than available CPUs and let the OS allocate work to the threads intelligently. There are several downsides to this approach though:
  • A StreamThread is not a lightweight entity. Each StreamThread results in at least three Java threads (main Stream Thread, producer network thread and coordinator heartbeat thread) and potentially several more if you use monitoring interceptors. Every Java Thread requires memory for the stack and management overhead for the JVM.
  • Each StreamThread is allocated a producer buffer which for high throughput configuration can be in the order of 256MB. So for 32 stream threads, you’ll need 8GB of heap dedicated to producer buffers alone.
  • The state store local cache is evenly shared between all stream threads, so if you have some threads with no state stores, you may have wasted cache space.
  • Under heavy load with too many stream threads you could start hitting various consumer or producer timeouts for starved threads causing wasteful task reallocation cycles.

One option in future would be if Kafka Streams provided some form of work stealing. Instead of fixing tasks to individual threads, have whichever thread is available process the next batch of messages for a given task committing just those offsets when complete.

This would be a complex change requiring a significant rewrite to the low-level Kafka Streams implementation, but would be worth considering as the API matures and is used for ever larger, more complex transformations.