Horizontal Pod Autoscaling (HPA) triggered by Kafka event

Ran Rubin
8 min readAug 7, 2019

--

Autoscale your Kafka (consumer) pods using metrics from Prometheus

This is a quick guide for autoscaling Kafka pods. These pods (consumer pods) will scale upon a Kafka event, specifically consumer group lag. The consumer group lag metric will be exported to Prometheus by a sidecar in the pod (we will be using kafka_exporter image for the sidecar).
The Prometheus Adapter will transform Prometheus’ metrics into k8s custom metrics API, allowing an hpa pod to be triggered by these metrics and scale a deployment.

This tutorial was done with a great help from Elhay Efrat.

Prerequisites

  1. K8s cluster :) I was using v1.14. You need v1.11+
  2. Prometheus — You must have Prometheus installed on your cluster.
  3. Kafka — You should have Kafka deployed on your k8s cluster or make the needed, mostly networking, changes in the following code for proper communication between objects.
    Throughout this guide we will assume that this “cluster” is under ‘kafka’ namespace and “my-kafka-brokeris a service that wraps the brokers’ endpoints.
  4. Api-versions— autoscaling/v2beta, extensions/v1beta1
  5. Helm — the package manager for k8s

Intro

As mentioned above, this is a quick guide to autoscale your Kafka pods using metrics from Prometheus.

I decided to use kafka_exporter as my metric exporter. Mainly because the metrics I seek, specifically consumer group lag, are not exported by the jmx. You can write your own application that exposes whatever specific metric you desire.

I decided to use Prometheus Adapter to create the custom metrics for k8s, mainly because it’s easy to scrape and pass metric data to Prometheus using annotation.

I decided to focus on “kafka_consumergroup_lag” metric but there are others that kafka_exporter export. Read more about what is Kafka consumer lag.

I will not deep dive into explaining each item we deploy, but links to very good documentation sources were added.

Basic Use Case

We want to have an application (on a pod in our k8s cluster) that will consume messages from Kafka. It should read “my-topic” topic and be a part of ”we-consume“ consumer group. In case that the Kafka consumer lag for this topic is more than 5, we want that the consumer pod will automatically scale out.
We will achieve the autoscaling by connecting 4 objects.

On the application side:

  1. kafka-exporter: This container runs an image of kafka_exporter. It will interact with the Kafka server and will expose metrics on the pod. It’s a side car in the consumer application pod.
  2. Kafka consumer application: The application that consumes Kafka’s messages. Resides in the same pod as the kafka-exporter

Monitoring side:

  1. Prometheus Adapter pod : Will be deployed using helm chart. It implements Custom Metrics API and creates k8s custom metrics from Prometheus metrics.

Connects the two:

  1. HPA : The hpa will use the custom metrics that were created by the adapter (in the monitoring side) to scale the consumer application.

Deploy our application with kafka_exporter

So we have an application that consumes Kafka messages and we want to scale it out when our consumer_lag threshold is breached using hpa.
The hpa scales the pod that it gets the metrics from. For that reason we need the kafka-exporter container to serve as a side car in our consumer pod.

In a sense this is not optimal because upon pod replication (of the consumer along with the exporter) the kafka-exporter containers will have the same metrics (as long as the number of the topic’s partitions is bigger than the number of consumers, aka pods), thus their replication is redundant. But again, the hpa will scale the pod with the metrics so we must connect the exporter with the application.

Before we begin, make sure to create a “my-topic” topic with 5 partitions (or as many as you want your hpa to scale out to). I will not cover how it’s done.

Let’s deploy an example kafka-consumer-application deployment. It consists of a kafka_exporter container and our consumer application.
Our application is created from a Kafka image and constantly reads from “my-topic” topic as a part of the consumer group ”we-consume” every second for 2 seconds.

# kafka-consumer-application.yamlkind: Deployment
apiVersion: extensions/v1beta1
metadata:
name: kafka-consumer-application
labels:
app_name: kafka-consumer-application
spec:
revisionHistoryLimit: 30
strategy:
type: RollingUpdate
rollingUpdate:
maxUnavailable: 0
maxSurge: 10
replicas: 1
selector:
matchLabels:
app_name: kafka-consumer-application
template:
metadata:
labels:
app_name: kafka-consumer-application
annotations:
prometheus.io/scrape: 'true'
prometheus.io/port: '9308'

spec:
containers:
- name: kafka-exporter
image: danielqsj/kafka-exporter:v1.2.0
imagePullPolicy: IfNotPresent
command:
- kafka_exporter
- "--kafka.server=my-kafka-broker.kafka:9092"
ports:
- name: http-metrics
containerPort: 9308

readinessProbe:
httpGet:
path: kafka-application
port: 9308
initialDelaySeconds: 5
timeoutSeconds: 5
- name: kafka-application
image: confluentinc/cp-kafka:5.0.1
imagePullPolicy: IfNotPresent
command:
- sh
- -c
- |
while true
do
kafka-console-consumer --bootstrap-server my-kafka-broker-headless.kafka:9092 --topic my-topic --from-beginning --group we-consume --timeout-ms 2000
sleep 1
done

Notice that “my-kafka-broker-headless” is the name of the k8s service that exposes your Kafka brokers (on port 9092) and “kafka” is its namespace.

The metrics coming from kafka-exporter will be shown under “kafka-consumer-application:9308/metrics” . This is why we give these values in the annotation. Prometheus will scrape the data from our pod according to the values we give in the annotation. Learn more about annotations here.

Let’s deploy this pod on consumer-deployment namespace

$ kubectl apply -f kafka-consumer-application.yaml -n consumer-test
deployment.extensions/kafka-consumer-application created
$ kubectl get pods -n consumer-test
kafka-consumer-application-dcc7df96-rwdrb 2/2 Running 0 107s
Our deployment so far

Verify metrics are getting to Prometheus

Make sure that Prometheus is getting the data from the exporter. You can do so using Grafana if you have it on your cluster — just play with the queries (import this dashboard — if data is shown, you are fine). If you don’t see anything, try to edit one of the graph queries to be plain “kafka_consumergroup_lag”, you should see the metrics.

You can also verify connection by using Prometheus’s REST API. A possible command is

curl 'http://localhost:9090/api/vi/series?match[]=kafka_consumergroup_lag'

(Here I assume you execute the command from the Prometheus server which has 9090 port exposed. That might not be the case, make sure you know the right endpoint for interacting with the Prometheus server.)

Now we know that Prometheus is getting the metrics from kafka_exporter which means we we are third of the way through!

Figure out your query

We must figure out the right query for us. I recommend doing so using Grafana, that way you can look at your queries and test them.

As mentioned before, we want to check the consumer lag of “my-topic” topic and “we-consume” consumer group.
For this use case the following query is enough:

avg_over_time(kafka_consumergroup_lag{topic="my-topic",consumergroup="we-consume"}[1m])

Deploy Prometheus Adapter using Helm chart

Now we need to install Prometheus Adapter pod. We will do so using its helm chart and passing our customized values.

Prometheus Adapter helm chart repository has very good documentation and so is the actual resource metric API. I strongly recommend reading it. By following their instructions I came up with this file

# values.yamlprometheus:
port: 80
url: http://prometheus-server
rules:
default: false
resource: {}
custom:
- seriesQuery: 'kafka_consumergroup_lag'
resources:
overrides:
kubernetes_namespace: {resource: "namespace"}
kubernetes_pod_name: {resource: "pod"}
name:
matches: "kafka_consumergroup_lag"
as: "kafka_consumergroup_lag"
metricsQuery: 'avg_over_time(kafka_consumergroup_lag{topic="my-topic",consumergroup="we-consume"}[1m])'

To use the chart ensure that prometheus.url and prometheus.port are configured with the correct Prometheus service endpoint.
The easiest place for us to install the chart is in the same namespace as our Prometheus server pod (lets call this namespace prom-ns)
Now, install the chart by running

helm upgrade -i prometheus-adapter stable/prometheus-adapter --version 1.2.0 -f values.yaml --namespace prom-ns

In order to test that this went well, wait a minute or two and run the following command from one of your master nodes:

kubectl get --raw /apis/custom.metrics.k8s.io/v1beta1/namespaces/consumer-test/pod/*/kafka_consumergroup_lag | jq

You should get

{
"kind": "MetricValueList",
"apiVersion": "custom.metrics.k8s.io/v1beta1",
"metadata": {
"selfLink": "/apis/custom.metrics.k8s.io/v1beta1/namespaces/consumer-test/pod/%2A/kafka_consumergroup_lag"
},
"items": [
{
"describedObject": {
"kind": "Pod",
"namespace": "consumer-test",
"name": "kafka-consumer-application-7bcb4fb9db-kg2xz",
"apiVersion": "/v1"
},
"metricName": "kafka_consumergroup_lag",
"timestamp": "2019-07-31T13:02:49Z",
"value": "0"
}
]
}

You can see in the response that we now have a metric named “kafka_consumergroup_lag” with a current value of 0.
If the “items” section is empty — there is something wring with your exporter. If you got Error from server.. that means something went wrong when you deployed the helm chart.

If you reached this far then you are two thirds of the way done :)

our deployment so far

Creating the HPA

As mentioned above, the hpa is the object that actually monitors if the metric is too high and scale the application accordingly. We will have to create the hpa in the “consumer-test” namespace in order to be able to scale our application.

In this example we will scale “kafka-consumer-application” deployment up to a maximum of 10 pods if the average kafka_consumergroup_lag goes above 5.

# consumer-hpa.yamlkind: HorizontalPodAutoscaler
apiVersion: autoscaling/v2beta1
metadata:
name: consumer-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: kafka-consumer-application

minReplicas: 1
maxReplicas: 10
metrics:
- type: Pods
pods:

metricName: "kafka_consumergroup_lag"
targetAverageValue: 5

Notice that we have to give it the exact metric name as we got when querying the Custom Metrics API.

Let’s deploy

$ kubectl apply -f consumer-hpa.yaml -n consumer-test
horizontalpodautoscaler.autoscaling/consumer-hpa created

It will take a minute or two until the hpa presents data. Then you should see

$ kubectl get hpa consumer-hpa -n consumer-test
consumer-hpa Deployment/kafka-consumer-application 0/5 1 10 1 63s

That’s it! Everything is connected.

The following scheme demonstrates the interactions between the items we deployed

Our deployment end state

Now feel free to raise the consumer group lag by loading “my-topic” with messages and see the magic happens. When the hpa threshold will be breached the application would scale

The hpa scales the application

--

--

Ran Rubin

An animated DevOps-MLOps engineer. Fascinated with bringing the operation and machine learning worlds together.