Kafka-Streams: a road to autoscaling via Kubernetes
Kafka-Streams is the stream processing library included in Apache Kafka, a streaming data platform. Because Kafka-Streams is a simple library, and not a framework, it’s used by applications that can be deployed and run in many ways. This article aims to present a few advantages that come with specific practices like containerization and orchestration, and especially autoscaling. Kafka-Streams is meant to be highly scalable. Let’s explore a way to automatically benefit from this elasticity with Kubernetes.
tl;dr
This article is based on the talk Scale in / Scale out with Kafka-Streams and Kubernetes from Xebicon’18. The repository xke-kingof-scaling contains all the YAML examples used in the experiment. A French version is available on blog.xebia.fr.
Consumer lag, a reason to scale
Why would we want to scale our streaming apps automatically? In this part we focus on Kafka-Streams applications and their properties to see how autoscaling can help.
Message consumption
Kafka consumers subscribe to topics and then regularly poll messages from it. The maximum number of messages per poll can be configured using the fetch.max.bytes
or max.poll.records
configuration. The consumer also commits the offsets of the messages to declare them as consumed and treated. When it comes to Kafka-Streams, polling intervals are generally short enough to get a real time processing effect.
So far so good! How could this cause any problems?
A Kafka-Streams application can be late and present a lag regarding the message consumption. There is a gap between the latest offset in the topic and the current offset treated by the application. This is referred to as an important record-lag and it can happen for several reasons:
- An application restart after a long down-time
- An intensive stateful transformation
- A peak period of usage for a given service
Like most other distributed systems, Kafka makes a really good usage of the maxim “divide and conquer”, now let’s see how it works in practice.
The consumer protocol
When a single instance of a streaming application is running, all the input topic partitions get assigned to it.
Reminder : by default messages are distributed based on their key. Messages with the same key are stored in the same partition.
By starting a second instance of our application we trigger a partition rebalance. All partitions get redistributed on the two instances. Doing so, we split the workload in two. Finally our maximum parallelism level corresponds to the number of partitions. Given an input topic with N partitions, any N+1 instance would be in stand by, waiting for another instance failure. All instances from a same consumer-group consume a distinct sequence of messages. The record-lag of the group corresponds to the sum of each instance lag.
Kubernetes and the custom metrics support
We have covered a lot so far, let’s go through it again before moving on. We have several instances of the same application, and we’d like to add or remove them based on the evolution of a shared metric exposed by the application: the consumer record-lag.
This looks like a perfect job for Kubernetes!
Kubernetes maintains the number of deployments asked for a given application. It can also natively scale (in or out) based on CPU usage or memory consumption. Fortunately, since version 1.6, it can also scale applications on custom metrics. This feature requires to enrich the original Kubernetes APIs with additional adapters. Among all possible implementation of adapters, we chose the adapters based on Stackdriver to create a bridge between Kubernetes custom metrics and our Kafka-Streams JMX metrics:
We first expose JMX metrics of the streaming application in Prometheus format. Each application instance has a sidecar prometheus-to-sd to scrap the metrics and send them to Stackdriver. Now lags can be plotted, but also queried by a metric server. At this point, the metric server custom-metrics-stackdriver-adapter feeds the Kubernetes master with the new custom metric values.
Now let’s put all the pieces together.
Expose the JMX metrics in a Prometheus format
Prometheus is an open-source monitoring and alerting toolkit and one of the first software member of CNCF. It defines a display format for metrics. This format, which is becoming a reference, is used in the following part of the experiment. To do so, we use the jmx-exporter project to format the metrics from our application.
We add a few JVM parameters to the streaming app:
java -cp ...
-Djava.rmi.server.hostname=127.0.0.1
-Djava.rmi.server.port=7071
-javaagent:/<>/jmx_prometheus_<version>.jar=9001:/<>/config.yaml
This way metrics are exposed on port 7071 and we can access them as a formatted version through HTTP on the port 9001. The config.yaml
file describes the metrics exposed.
With this block of configuration we are able to export all the metrics among kafka.consumer
of type consumer-fetch-manager-metrics
with the information records-lag
(for the input topic GAME-FRAME-RS). We duplicate this block of config for each input topic partitions. Note that we assign the type GAUGE to this configuration, this is required. (complete file)
Building the Docker image of the streaming app
Metrics can now be queried in HTTP on a single machine in development mode.
Finally, we need to package everything inside a docker image to take advantage of this new feature inside the pod of Kubernetes. The build tool used here is Gradle and by adding the Docker plugin we can configure the project as follow:
In a few lines we declare:
- The Dockerfile to build
- The entry point of the streaming app (Main class)
- Name, version and repository where to upload the image
At the root of the container we add the following files:
$ tree -l 3
#/
#└── opt
# └── kos-stream
# ├── config.yaml
# └── jmx_prometheus_javaagent-0.3.1.jar
These two files are referenced from the JVM parameters (see the previous section).
Metrics aggregation to Stackdriver
Now that metrics are exposed on the address and port of a Kubernates Pod, the next step is to use prometheus-to-sd from the k8s-stackdriver project. To do so, we include an image in the streaming application pod like a sidecar. Its goal is to scrap the metrics and send them to Stackdriver. By doing so, metrics will be both persisted and displayed in a dashboard.
Note that the --source
flag uses port 9001 in accordance with the configuration of jmx-prometheus.
MetricServer Setup
At this point, metrics from our application are feeding nice dashboards that our Kubernetes master cannot read! What it can do instead is accessing the /apis/metrics
endpoint, so what we can do is to enrich this API.
$ kubectl get --raw "/apis/custom.metrics.k8s.io/v1beta1" | jq
# Error from server (NotFound): the server could not find the requested resource
Here the result does not contain any reference to the custom metrics.
The application custom-metrics-stackdriver-adapter from the k8s-stackdriver project is meant to enrich the Kubernetes API by exposing custom metrics stored in Stackdriver. The main goal is to enable autoscaling on custom metrics. To deploy it, all you need to do is running the following command:
kubectl create -f \ https://raw.githubusercontent.com/GoogleCloudPlatform/k8s-stackdriver/master/custom-metrics-stackdriver-adapter/deploy/production/adapter.yaml
$ kubectl get --raw "/apis/custom.metrics.k8s.io/v1beta1" | jq#{
# "name": "*/custom.googleapis.com|consumer_lag_game_frame_rs",
# ...
# "name": "*/custom.googleapis.com|consumer_lead_game_frame_rq",
# ...
#}
Now the Kubernetes API has a lot more endpoints exposed. Most of them are related to custom or external metrics. We are now really close to enabling autoscaling.
Horizontal Pod Autoscaler configuration
Let’s put all the things we’ve done so far into one picture:
In the blue pod, we have the streaming application and its sidecar. The sidecar scraps an internal endpoint from the pod and sends all the custom metrics to Stackdriver. Then Stackdriver acts as a backend to the metric server (deployed in a pink Pod). Now the Kubernetes master has all the custom metrics it needs. The last thing to do is setup a particular behavior for a given threshold and metric. It works similarly to a standard metric, and we configure an HPA (Horizontal Pod Autoscaler) as follows:
Note that we use version v2beta1 of the autoscaling API. The maximum replicas is set to 4 since we have 4 partitions for each input topic. The name of the targeted metric is custom.googleapis.com
followed by the name given in the jmx-exporter config file.
Conclusion
By generating enough messages in a short amount of time, we eventually reach the threshold specified by the HPA configuration. Additional pods are added by Kubernetes. They all contain an instance of the streaming application and their sidecar prometheus-to-sd. We can see a first drop of the record-lag since it’s split over different pods. We can see new lines apprearing which correspond to new instances spawned by Kubernetes. Finally each application instance works to decrease a sub part of the record-lag.
We have seen how, with existing tools like Stackdriver and Kubernetes, it was simple to enrich the capabilities of our streaming application. We showed you that applying autoscaling is possible but I didn’t tell you whether you should do it systematically. Autoscaling has obvious advantages as the ones listed at the beginning of this post, but it’s not always efficient depending on the use case. Scaling comes with a cost, and for some application usages or workloads this may be non negligible. Stateful operations and state migrations are some of the most known issues. To address these issues we could consider using StateFullSets, an alternative to simple Kubernetes Deployment with persistent storage, but we will still have to quantify that cost for different operations.
Going further
Finally, here is a list of useful readings on the same subject:
- Deploying Kafka Streams Applications with Docker and Kubernetes — by Gwen Shapira and Matthias J. Sax from Confluent
- Helm 101: Apache Kafka and Confluent Platform for Kubernetes Explained — by Viktor Gamov from Confluent
- Apache Kafka on Kubernetes — Could You? Should You? — by Gwen Shapira from Confluent
- Local Persistent Volumes for Kubernetes Goes Beta
- How to deploy Kafka Stream applications on Kubernetes?
- Horizontal Pod Autoscaler Reloaded — Scale on Custom Metrics — Maciej Pytel & Solly Ross from Google / RedHat
Find all technical articles by Xebia on blog.xebia.fr