Optimizing Kafka Streams Apps on Kubernetes by Splitting Topologies
Kafka Streams is a library to build stream processing applications on top of data stored in Apache Kafka. Those streaming applications can be deployed to Kubernetes (e.g., using Streams-Bootstrap and the related Helm Chart). Kafka Streams applications can require lots of resources in your Kubernetes Cluster. Knowing the internals of your Kafka Streams applications is crucial to optimize your application’s Kubernetes deployments. Understanding and splitting the application’s processor topology can be essential to reduce costs and improve complex applications’ manageability.
Imagine the following scenario: you develop a Kafka Streams application that includes multiple consecutive processing steps. One of the downstream steps needs much more time to process records than the upstream step. Depending on your stream processor topology, you might end up in a state where your upstream processor node is done with processing, but the downstream processors still have to process lots of records. Consequently, because all processing steps run in one Kubernetes Deployment, you end up wasting lots of resources for the upstream processor with a consumer lag of zero because it runs in the same application as the long-running sub-topology.
Ideally, at this time, you could already free up the resources for the upstream processor completely. This becomes especially interesting when one of the upstream processing steps requires a state store and thus allocates lots of resources. Further, to speed up processing, you could use the just-released resources to scale up the long-running sub-topology.
Splitting sub-topologies into separate Kafka Streams applications, and thus Kubernetes Deployments, can help utilize and free up resources more economically, especially in scenarios like the one described. Moreover, this can result in a more fine granular control of complex streaming applications running in Kubernetes.
This blog post shows how and when splitting your Kafka Streams applications into multiple applications can help to give you more control over different processing steps. This allows us to scale and allocate resources independently depending on the individual processing internals. To do so, we briefly show how you can inspect and identify Kafka Streams topologies and sub-topologies. Finally, we give you a glimpse of how you can utilize splitting applications to scale to zero (free up all resources) when processing is completed, although downstream sub-topologies are still processing.
You can find the code for this blog post in our GitHub repository.
What are Sub-Topologies?
Every Kafka Streams application consists of connected processing nodes that, in the end, result in a directed acyclic graph, the stream processor topology. You define the topology using the Processor API or the Kafka Streams DSL.
If you are new to the concept of the processor topology of Kafka Streams API, the official docs can give you a good understanding.
Moreover, a topology can consist of multiple sub-topologies. Sub-topologies are connected via topics and thus can run independently of each other and even be processed in parallel. Thus, sub-topologies may be instantiated as separate stream tasks. We take advantage of this characteristic in the following.
How to identify Sub-Topologies?
Because sub-topologies are connected via topics, we have to understand how those topics between sub-topologies are created. There are two options:
- Topics defined by the developer of the streams application:
#repartition(), the combination of
.through(topicName)(Deprecated since Kafka Streams 2.6)
- Internal topics that are created by Kafka Streams automatically:
E.g., by setting a new key for each input record (with
.selectKey()) if a key-based operator (e.g., aggregations or joins) is applied afterward, the data needs to be redistributed using an internal topic.
To quickly identify sub-topology, you can also analyze the final stream processor topology of your application by building and describing it:
The output is a string that describes the topology and even lists the different sub-topologies. To make it more human-readable, you can visualize the topology as a graph by pasting the string into the text form of the awesome Kafka Streams Topology Visualizer.
Let’s illustrate using an example
How can we utilize this knowledge about our Kafka Streams Application to break it into sub-topologies? Imagine the following example:
We have two topics:
- Orders Topic (fields: orderId, customerId, …)
- Customers Topic, containing some information about the customers that placed orders
(For the sake of simplicity, we assume for our example that the customer id already partitions the order records.)
We first want to look up the customer information for every incoming order. That means we perform a left join that requires loading the customer records into a state store (KTable using the Kafka Streams DSL). Depending on the number of customers this state store can require lots of memory and disk space.
After that, a processing step follows that takes longer per record than the left join. This can be an aggregation (e.g., for our scenario the number of customers that ordered a product), complex data parsing, or calling an API. To keep it simple, let’s consider this a black box that takes very long to process single records.
The code for building the topology looks like this:
Figure 1 depicts the related processor topology for our example:
In this example, there is only one sub-topology. When deployed, this application might take very long to process single records because of the long-running processor. How can we optimize this?
One solution is introducing an intermediate topic to break the topology into sub-topologies that end up as separate tasks. Those tasks can run in parallel.
We can force the creation of an intermediate topic using
.repartition() between both processors:
In Figure 2, you can see the resulting processor topology. We can identify two sub-topologies. The first sub-topology loads the customer records into a state store and runs the look-up for the customer info for every incoming order. After that, the records are produced into an intermediate topic, which serves as the input for the long-running sub-topology.
Often, you do not have to force the creation of intermediate topics because a repartitioning was needed anyway before an aggregation or a join because of a newly selected key. Thus, the Kafka Streams DSL automatically introduces the intermediate topic that results in multiple sub-topologies.
Be aware that introducing an intermediate topic means that the amount of data in the Kafka cluster increases. Although you can circumvent this by using appropriate retention policies and log compaction.
After deploying this Kafka Streams application, we might see that the upstream sub-topology processes and produces the records faster into the intermediate topic than the second, long-running sub-topology (see Figure 3). Finally, we will end up with a high consumer lag for the second sub-topology. The upstream sub-topology finished processing and waits for new incoming records but allocates resources for the state store without utilizing them to process records. So how can we circumvent this?
Splitting Topologies into separate Streaming Apps
Here comes converting sub-topologies into separate streams applications into play. To do so, we convert the first sub-topology into a streams application with orders and customers as input topics and the intermediate topic as the output topic.
The application consisting of the long-running sub-topology gets the intermediate topic as input and produces to the final output topic.
The final result is depicted in the following Figure:
Using Streams-Bootstrap, deploying the application to Kubernetes is simple now. Just make sure that the intermediate topic is created before you deploy the second sub-topology.
Optimizing resources and scale independently
Now we have two separate Kafka Streams applications. Hence, we can set the resources for both applications based on their individual needs. Additionally, we can scale the applications independently. This enables us to optimize even more:
Kubernetes allows us to autoscale applications using Horizontal Pod Autoscalers. Further, we can easily autoscale based on the Kafka consumer lag using KEDA in combination with the Apache Kafka scaler. To make it even better, KEDA allows us to scale to zero easily.
In our scenario, we can scale the first sub-topology to zero as soon as the consumer lag went to zero. This means we free up all resources for this sub-topology while the second long-running sub-topology still can run as long as it takes to process the remaining records. By using the just-released resources, we could even scale up the downstream sub-topology (manually) to increase the throughput.
As a sidenote: KEDA added an awesome new feature that allows us to optimize the scaling of stateful applications. Now, we can configure the deployment to scale from 0 to N replicas instantly when the consumer lag went from zero to X. Consequently, this allows us to shorten the time for re-building state-stores when the consumer lag increases because we instantly distribute the re-building of the state store between multiple consumer members.
Using our Helm chart for Streams-Bootstrap, we can set up the scaling of our application’s Kubernetes Deployment with KEDA easily. The autoscaling part for the deployment for the first sub-topology looks like this:
To see the full Helm values and the values for the other deployment please refer to the GitHub repository.
Another nice side-effect when running sub-topologies in separate Kafka Streams applications is that you can easily clean up and start reprocessing for downstream sub-topologies independently of upstream sub-topologies. This can be especially interesting for development or whenever you changed only the processing logic of downstream sub-topologies.
In this blog post, we illustrate how and when you can split processor topologies of Kafka Streams applications to run as separate deployments in Kubernetes. This gives us more control of complex Streaming applications and we can scale and optimize the sub-topology deployment resources independently. Moreover, we can reduce operating costs by scaling down sub-topologies with zero or low consumer lag while downstream sub-topologies may still have to process records. Using KEDA allows us to easily scale to zero when one of the sub-topologies ends up with zero consumer lag.