Kafka Stream — Internals Guide
Kafka Stream library needs to complete a couple of steps before getting a stream application up and running. These steps aren’t coordinated but rather dependent on the output of others.
- Arranging Nodes into groups.
- Identifying Stream Tasks and assigning topics and partitions to them.
- Allocating the previously created Stream Tasks to Stream Threads.
The article discusses each of these steps in detail. I’ve put references to the Kafka code base as needed.
Node Grouping
Nodes
A Node is the primary abstraction of execution logic in a Kafka stream application. There are three types of Nodes. Source, Processor, and Sink.
- Source Nodes handles consuming from source topics.
- Processor Nodes are for abstracting out logic execution (state store too are attached to Processor Nodes).
- Sink Nodes handle publishing to output topics.
So indirectly through DSL or directly through the Processor API, writing a stream application is all about defining the data flow using these Nodes.
When we add Nodes to a topology, Kafka internally maintains a DAG, based on node names [1]. This structure would be later referred to when creating Node Groups [2].
A manual logic for Node grouping would look like the following.
Consider Figure-1 as a reference.
- Start with a source topic and find the relevant Source Node.
- Start with the source-node-1
- Group -1 — {source-node-1} - Traceback it’s descendants till you get to a Sink Node or a Processor Node with no children.
- Descendants : processor-1, processor-4, sink-1 - Every descendant of that Source Node belongs to the same Node Group.
- Group -1 — {source-node-1, processor-1, processor-4, sink-1} - Do this for all source topics. If the intersection of multiple Node groups is not empty, then consider them to be a single Node Group.
— — — — — — — — — — — — — — — — — — — — — — — — — — — —
- Group 1 — {source-node-1, processor-1, processor-4, sink-1}
- Group 2 — {source-node-2, processor-2, processor-4, sink-1}
- Group 3 — {source-node-3, processor-3, sink-2}
— — — — — — — — — — — — — — — — — — — — — — — — — — — —
- {Group 1} ∩ {Group 2} →
{source-node-1, source-node-2, processor-1, processor-2, processor-4, sink-1}
- {Group 1} ∩ {Group 3} →Nill
- {Group 2} ∩ {Group 3} →Nill - Final Node Groups from the example are ;
- Node Group 1 →
{source-node-1, source-node-2, processor-1, processor-2, processor-4, sink-1}
- Node Group 2 →
{source-node-3, processor-3, sink-2}
The sub-topologies we get from topology.describe()
are in fact, Node Groups and are quite useful when creating and initializing Stream Tasks.
Effect Of State Stores In Node Grouping
State Stores are always attached to one or more Processor Nodes. So when specifying Processors attached to a State Store, they become part of the same Node Group.
From our example, if we do the following.
topology.addStateStore(storeBuilder, "processor-4", "processor-3");
This will connect Node Group 1 and Node Group 2 into one.
[1] Kafka code reference →
Take the method addProcessor
in InternalTopologyBuilder
for an example. An attribute called nodeGrouper: QuickUnion
been used to add the name of the node to a map and then reshuffle it to fit in a DAG.
[2] Kafka code reference →
The method, makeNodeGroup
in InternalTopologyBuilder
implements the Node Group extraction logic.
Identifying Stream Tasks And Assigning Topics And Partitions
Stream Thread
Stream Thread is the primary execution unit of a stream application. It consumes records periodically and forwards them to the corresponding Stream Tasks. If Stream Thread is a CPU then Stream Tasks are the scheduled processes. Figure-4 shows the basic components and data flow of a Stream Thread.
Stream Task
A Stream Task is always associated with a Node Group, a particular partition of source topics of the said Node Group. It’s the most important thing, Stream Tasks may process records from multiple source topics but that’ll always be from a unique partition id which is part of its Task Id.
Thus it’s main responsibility is sending consumed records through the nodes of the sub-topology (Node-Group). Figure-5 shows the main components.
Identifying Tasks And Assigning Topics And Partitions [3]
This is done during a consumer group re-balancing. Kafka stream got a custom partition assignor, StreamPartitionAssignor
. The logic is as follows and refers the Figure-6 too. It shows three topics with their respective partitions.
- Get all the source topics by Node Group.
- Group-1 →{Topic-A, Topic-B}
- Group-2 →{Topic-C} - For each group, find the maximum number of partitions out of group topics.
- In Group-1, Topic-A got 4, and Topic-B got 5 partitions. So the max number of partitions is 5.
- In Group-2 there’s only one topic, Topic-C, and got 4 partitions. So the max number of partitions is 4. - In each group for every partition up to max-partition count, create a Task Id. The Task Id is a combination of <node-group-id, partition-number>.
- Group-1 will have 5 Task Ids.
- Id of Task 1→<0,0>, Id of Task 2→<0,1>, Id of Task 3→<0,2>,
Id of Task 4→<0,3>, Id of Task 5→<0,4>
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
- Group-2 will have 5 Task Ids too.
- Id of Task 1→<1,0>, Id of Task 2→<1,1>, Id of Task 3→<1,2>,
Id of Task 4→<1,3>, Id of Task 5→<1,4> - Each Task would handle records from the assigned partition across all relevant source topics of the Node Group.
Group-1
- Task-1 <0,0> →{Partition 0 of topic-A, Partition 0 of topic-B}
- Task-2 <0,1> →{Partition 1 of topic-A, Partition 1 of topic-B}
- Task-3 <0,2> →{Partition 2 of topic-A, Partition 2 of topic-B}
- Task-4 <0,3> →{Partition 3 of topic-A, Partition 3 of topic-B}
- Task-5 <0,4> →{Partition 4 of topic-B}
Task-5 only handles Partition-id 4 of topic-B since it’s the only topic with 5 partitions (partitions are zero-indexed). - At the end of the process, we’ll have Task Id’s defined for each Node Group along with their responsible topic and partition ids.
[3] Kafka code reference →
The method partitionGroups
in DefaultPartitionGrouper
encapsulate the logic of identifying Tasks and assigning topics and partitions. It invoked as part of the StreamPartitionAssignor
assign
method implementation.
Allocating Stream Tasks To Stream Threads
Stream Task to Stream Thread allocation is done in multiple steps. [4]
- First it allocates Tasks to Clients (Stream App Instances). The Task assignor tries to reassign existing Tasks to the same Client.
- The remaining Tasks are distributed as evenly as possible considering the number of available Stream Threads and the number of already assigned tasks in each Client.
- Lastly, assigned Tasks of Clients are distributed among their respective Stream Threads.
Applying this to our example, think of two instances of the stream app with each having two Stream Threads. The Task allocation upon a fresh deployment would probably look like the following.
[4] Kafka code reference →
Assigning Tasks to Clients is handled by the assign
method of StickTaskAssignor
. Once that’s done, StreamPartitionAssignor
assign
method calls computeNewAssignment
to distribute Tasks assigned to Clients among their Stream Threads.
Example Showing The Effect Of Common Descendant In Node Grouping
To finish it off, let’s check an example and see how Node Grouping works.
The following code creates a simple stream app with two Transformers and two attached State Stores. Let’s check the topology. (topic-1 and topic-2 both got the same number of partitions.)
Code-Snippet-1 — Simple stream application without common descendants.
Based on the topology view, it shows two Node Groups (2 sub-topology). Therefore for an example, partition 1 of topic-1 and partition-1 of topic-2 would be handled by two different Stream Tasks. No connection what so ever between the two partitions.
Just to see the effect of having a common descendant, let’s add a fake Processor Node as a child of both transformer-1 and transformer-2.
Code-Snippet-2 — The same stream application with a dummy processor added as a descendant of both Transformers.
Adding the fake Processor Node resulted in a common descendant between the previous two Node Groups and it resulted in a single Node Group.
Now the partition-1 of both topic-1 and topic-2 are handled by the same Stream Task.
The example is just to demonstrate Node Grouping and there’s no practical use of having fake Processor Nodes.