Kafka Stream — Internals Guide

prabath weerasinghe
7 min readApr 25, 2020

--

Kafka Stream library needs to complete a couple of steps before getting a stream application up and running. These steps aren’t coordinated but rather dependent on the output of others.

  1. Arranging Nodes into groups.
  2. Identifying Stream Tasks and assigning topics and partitions to them.
  3. Allocating the previously created Stream Tasks to Stream Threads.

The article discusses each of these steps in detail. I’ve put references to the Kafka code base as needed.

Node Grouping

Nodes

A Node is the primary abstraction of execution logic in a Kafka stream application. There are three types of Nodes. Source, Processor, and Sink.

  • Source Nodes handles consuming from source topics.
  • Processor Nodes are for abstracting out logic execution (state store too are attached to Processor Nodes).
  • Sink Nodes handle publishing to output topics.

So indirectly through DSL or directly through the Processor API, writing a stream application is all about defining the data flow using these Nodes.

When we add Nodes to a topology, Kafka internally maintains a DAG, based on node names [1]. This structure would be later referred to when creating Node Groups [2].

Figure-1 — Example of a Node topology in a Kafka Stream App.

A manual logic for Node grouping would look like the following.
Consider Figure-1 as a reference.

  1. Start with a source topic and find the relevant Source Node.
    - Start with the source-node-1
    - Group -1 — {source-node-1}
  2. Traceback it’s descendants till you get to a Sink Node or a Processor Node with no children.
    - Descendants : processor-1, processor-4, sink-1
  3. Every descendant of that Source Node belongs to the same Node Group.
    - Group -1 — {source-node-1, processor-1, processor-4, sink-1}
  4. Do this for all source topics. If the intersection of multiple Node groups is not empty, then consider them to be a single Node Group.
    — — — — — — — — — — — — — — — — — — — — — — — — — — — —
    - Group 1 — {source-node-1, processor-1, processor-4, sink-1}
    - Group 2 — {source-node-2, processor-2, processor-4, sink-1}
    - Group 3 — {source-node-3, processor-3, sink-2}
    — — — — — — — — — — — — — — — — — — — — — — — — — — — —
    - {Group 1}
    {Group 2} →
    {source-node-1, source-node-2, processor-1, processor-2, processor-4, sink-1}
    - {Group 1}
    {Group 3} →Nill
    - {Group 2}
    {Group 3} →Nill
  5. Final Node Groups from the example are ;
    - Node Group 1 →
    {source-node-1, source-node-2, processor-1, processor-2, processor-4, sink-1}
    - Node Group 2 →
    {source-node-3, processor-3, sink-2}
Figure-2 — Final Node Groups

The sub-topologies we get from topology.describe() are in fact, Node Groups and are quite useful when creating and initializing Stream Tasks.

Effect Of State Stores In Node Grouping

State Stores are always attached to one or more Processor Nodes. So when specifying Processors attached to a State Store, they become part of the same Node Group.
From our example, if we do the following.

topology.addStateStore(storeBuilder, "processor-4", "processor-3");

This will connect Node Group 1 and Node Group 2 into one.

Figure-3 — Effect of adding a State Store in Node Grouping.

[1] Kafka code reference →
Take the method
addProcessor in InternalTopologyBuilder for an example. An attribute called nodeGrouper: QuickUnion been used to add the name of the node to a map and then reshuffle it to fit in a DAG.

[2] Kafka code reference →
The method,
makeNodeGroup in InternalTopologyBuilder implements the Node Group extraction logic.

Identifying Stream Tasks And Assigning Topics And Partitions

Stream Thread

Stream Thread is the primary execution unit of a stream application. It consumes records periodically and forwards them to the corresponding Stream Tasks. If Stream Thread is a CPU then Stream Tasks are the scheduled processes. Figure-4 shows the basic components and data flow of a Stream Thread.

Figure-4 — Main components and data flow of a Stream Thread.

Stream Task

A Stream Task is always associated with a Node Group, a particular partition of source topics of the said Node Group. It’s the most important thing, Stream Tasks may process records from multiple source topics but that’ll always be from a unique partition id which is part of its Task Id.
Thus it’s main responsibility is sending consumed records through the nodes of the sub-topology (Node-Group). Figure-5 shows the main components.

Figure-5 — Main component of a Stream Task.

Identifying Tasks And Assigning Topics And Partitions [3]

Figure-6 — Topics with partitions consumed by each Source Node. Topic-A got 4 partitions, Topic-B got 5 partitions and Topic-C got 4 partitions.

This is done during a consumer group re-balancing. Kafka stream got a custom partition assignor, StreamPartitionAssignor . The logic is as follows and refers the Figure-6 too. It shows three topics with their respective partitions.

  1. Get all the source topics by Node Group.
    - Group-1 →{Topic-A, Topic-B}
    - Group-2 →{Topic-C}
  2. For each group, find the maximum number of partitions out of group topics.
    - In Group-1, Topic-A got 4, and Topic-B got 5 partitions. So the max number of partitions is 5.
    - In Group-2 there’s only one topic, Topic-C, and got 4 partitions. So the max number of partitions is 4.
  3. In each group for every partition up to max-partition count, create a Task Id. The Task Id is a combination of <node-group-id, partition-number>.
    - Group-1 will have 5 Task Ids.
    - Id of Task 1→<0,0>, Id of Task 2→<0,1>, Id of Task 3→<0,2>,
    Id of Task 4→<0,3>, Id of Task 5→<0,4>
    — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
    - Group-2 will have 5 Task Ids too.
    - Id of Task 1→<1,0>, Id of Task 2→<1,1>, Id of Task 3→<1,2>,
    Id of Task 4→<1,3>, Id of Task 5→<1,4>
  4. Each Task would handle records from the assigned partition across all relevant source topics of the Node Group.
    Group-1
    - Task-1 <0,0> →{Partition 0 of topic-A, Partition 0 of topic-B}
    - Task-2 <0,1> →{Partition 1 of topic-A, Partition 1 of topic-B}
    - Task-3 <0,2> →{Partition 2 of topic-A, Partition 2 of topic-B}
    - Task-4 <0,3> →{Partition 3 of topic-A, Partition 3 of topic-B}
    - Task-5 <0,4> →{Partition 4 of topic-B}
    Task-5 only handles Partition-id 4 of topic-B since it’s the only topic with 5 partitions (partitions are zero-indexed).
  5. At the end of the process, we’ll have Task Id’s defined for each Node Group along with their responsible topic and partition ids.
Figure-7 — Final topic and partition assignment among Tasks.

[3] Kafka code reference →
The method
partitionGroups in DefaultPartitionGrouper encapsulate the logic of identifying Tasks and assigning topics and partitions. It invoked as part of the StreamPartitionAssignor assign method implementation.

Allocating Stream Tasks To Stream Threads

Stream Task to Stream Thread allocation is done in multiple steps. [4]

Figure-8 — Two main steps of allocating Stream Tasks.
  • First it allocates Tasks to Clients (Stream App Instances). The Task assignor tries to reassign existing Tasks to the same Client.
  • The remaining Tasks are distributed as evenly as possible considering the number of available Stream Threads and the number of already assigned tasks in each Client.
  • Lastly, assigned Tasks of Clients are distributed among their respective Stream Threads.

Applying this to our example, think of two instances of the stream app with each having two Stream Threads. The Task allocation upon a fresh deployment would probably look like the following.

Figure-9 — Allocation of Stream Tasks from our example.

[4] Kafka code reference →
Assigning Tasks to Clients is handled by the assign method of StickTaskAssignor . Once that’s done, StreamPartitionAssignor assign method calls computeNewAssignment to distribute Tasks assigned to Clients among their Stream Threads.

Example Showing The Effect Of Common Descendant In Node Grouping

To finish it off, let’s check an example and see how Node Grouping works.
The following code creates a simple stream app with two Transformers and two attached State Stores. Let’s check the topology. (topic-1 and topic-2 both got the same number of partitions.)

Code-Snippet-1 — Simple stream application without common descendants.

Figure-10 — Topology of the Code Snippet-1.

Based on the topology view, it shows two Node Groups (2 sub-topology). Therefore for an example, partition 1 of topic-1 and partition-1 of topic-2 would be handled by two different Stream Tasks. No connection what so ever between the two partitions.
Just to see the effect of having a common descendant, let’s add a fake Processor Node as a child of both transformer-1 and transformer-2.

Code-Snippet-2 — The same stream application with a dummy processor added as a descendant of both Transformers.

Figure-11 — Topology view of the Code-Snippet-2.

Adding the fake Processor Node resulted in a common descendant between the previous two Node Groups and it resulted in a single Node Group.
Now the partition-1 of both topic-1 and topic-2 are handled by the same Stream Task.
The example is just to demonstrate Node Grouping and there’s no practical use of having fake Processor Nodes.

--

--