Auto Scaling Kafka Consumers with Kubernetes and KEDA

Sushil Kumar
Google Cloud - Community
9 min readJun 2, 2023

Apache Kafka is one of the most popular open source, distributed event streaming platform. The ubiquity of Kafka can be gauged by the fact that its used by majority of top players in Banking, Insurance, Telecom and Manufacturing.

Kafka started as a message broker but has evolved into a full blown event streaming platform with the addition of components like Kafka Connect, Kafka Streams and KSQLDB. Kafka provides highly distributed messaging capability that can scale upto millions of messages per second.

On the other hand, Kubernetes in the most popular and most used container orchestration framework. Kubernetes shares lot of its roots in Borg, Google’s internal container-oriented-cluster-orchestration framework. Kubernetes provides construct to scale your workloads on demand.

Naturally a cross between Kafka and Kubernetes would be a match made in heaven. In this article we will see how we can auto-scale a Kafka Consumer deployed on Kubernetes.

Those familiar with Kubernetes might already be aware of constructs such as Horizontal Pod Autoscaler (HPA) , Vertical Pod Autoscaler(VPA) and more recently Multi-Dimensional Pod Autoscaler. However all of these scaling constructs use CPU and/or Memory to trigger scaling. This generally works out of the box with Kafka Consumers as well, however a more appropriate metric to scale Kafka Consumers would be Consumer Lag.

If you are not familiar with lag, let me try to explain it. If the producers are producing at a much faster rate than consumer are consuming there will be a huge backlog of messages in Kafka, that is called Consumer Lag. By default Kafka doesn’t have any tools to monitor this lag, however there are many after market solutions both paid and open source to monitor the consumer lag.

Now if we wish to have performant pipeline, that is our consumers are consuming the messages at almost the same speed at which producers are producing, we might want to use consumer lag as an indicator to scale our consumers. This is where KEDA (Kubernetes Event-Driven Autoscaling) comes into the picture.

KEDA integrates with the HPA API in Kubernetes and allows you to create scaling triggers off of custom metrics (like Consumer Lag for Kafka, Queue Size for ActiveMQ and many more). KEDA has a concept of Scalers which provides a way to define triggers for scaling. You can take a look at complete list of scalers supported by KEDA. KEDA can scale your Deployment, StatefulSets and even Jobs.

In this post, we will see how we can scale our Kafka Consumer deployed on Google Kubernetes Engine (GKE) based on consumer lag using KEDA. We’ll see how to install KEDA on our cluster and then how to define custom KEDA resources to specify our scaling triggers. Another interesting thing that we’ll do is that we’ll use an Auto Pilot GKE cluster. This means not just our Kafka Consumers, even our Kubernetes cluster will scale as more resources are demanded by consumer Pods.

Lets get started.

Generated by foto.com

Prerequisites

We’ll start by setting everything up before we begin our demo.

  1. Setup Kubernetes cluster (I’m using GKE as example, you can use any Kubernetes cluster even MiniKube on your local machine)
  2. Setup Kafka Cluster (I’m using Confluent to setup the Kafka Cluster, but if you are using local k8s cluster, you can also have your Kafka setup locally).
  3. Intellij IDE (or any other code editor). I’m using Spring Boot support for Apache Kafka to create both producers and consumers.

Once you have pre-requisites out of the way let us start by setting up KEDA on our Kubernetes cluster.

Deploy KEDA on your K8S cluster

KEDA can be deployed via multiple ways on your cluster. You can use

  • Helm charts
  • Operator Marketplace
  • YAML resources and kubectl

to deploy it to your cluster. In this post, I’ll be using 3rd option of using YAML and kubectl to deploy the KEDA CRDs to my cluster. You can follow the official documentation to deploy via some other method. The only thing to note is that KEDA only supports Kubernetes v1.24 and above. So if you are running any older version, you might have to upgrade before you can take advantage of all the scaling goodness of KEDA.

Run following command against your K8S cluster

kubectl apply -f https://github.com/kedacore/keda/releases/download/v2.10.1/keda-2.10.1.yaml

I’m using Cloud Shell to interact with my Auto Pilot GKE cluster. You’ll see logs suggesting all the different resources being created.

namespace/keda created
customresourcedefinition.apiextensions.k8s.io/clustertriggerauthentications.keda.sh created
customresourcedefinition.apiextensions.k8s.io/scaledjobs.keda.sh created
customresourcedefinition.apiextensions.k8s.io/scaledobjects.keda.sh created
customresourcedefinition.apiextensions.k8s.io/triggerauthentications.keda.sh created
serviceaccount/keda-operator created
role.rbac.authorization.k8s.io/keda-operator created
clusterrole.rbac.authorization.k8s.io/keda-external-metrics-reader created
clusterrole.rbac.authorization.k8s.io/keda-operator created
rolebinding.rbac.authorization.k8s.io/keda-operator created
rolebinding.rbac.authorization.k8s.io/keda-auth-reader created
clusterrolebinding.rbac.authorization.k8s.io/keda-hpa-controller-external-metrics created
clusterrolebinding.rbac.authorization.k8s.io/keda-operator created
clusterrolebinding.rbac.authorization.k8s.io/keda-system-auth-delegator created
service/keda-admission-webhooks created
service/keda-metrics-apiserver created
service/keda-operator created
deployment.apps/keda-admission created
deployment.apps/keda-metrics-apiserver created
deployment.apps/keda-operator created
apiservice.apiregistration.k8s.io/v1beta1.external.metrics.k8s.io created
validatingwebhookconfiguration.admissionregistration.k8s.io/keda-admission created

You can also verify if all the components are running by running following command

kubectl get all -n keda

You should see all the resources deployed.

KEDA resources deployed

Setup Kafka Producer and Consumer

I have setup a Kafka Cluster via Confluent in GCP in the same region (us-central) as my Kubernetes cluster. I have also setup a topic named keda-demo-topic with 10 partitions and rest of all the properties as default.

Now let us fire up our IDEs and write the producer and consumer. We’ll deploy our producer on local machine and consumer on Kubernetes cluster and see how it goes.

Now you can write you own producer and consumer or you can re-use/re-purpose my code according to your own setup. You can find repos for both producer and consumer on my Github.

Below I’m listing the main configurations for both my consumer and producer.

Producer Configuration


@Bean
public Map<String, Object> producerConfigs() {
return Map.of(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class
);
}

@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
public class KedaProducer implements Runnable {
private final KafkaTemplate<String, String> template;


private final String topic;

@Autowired
public KedaProducer(KafkaTemplate<String, String> template, @Value("${kafka.topic}") String topic) {
this.template = template;
this.topic = topic;
}


@Override
public void run() {
....
template.send(topic, message)
....
}

Consumer Configuration

  @Bean
public Map<String, Object> consumerConfigs() {
return Map.of(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers,
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerializer.class,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerializer.class
);
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}


public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainer() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}

}
@Component
public class KafkaConsumer {


@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.consumer-group-id}")
public void handle(String message) {
System.out.println("Consumed a message : " + message);
}

}

Then I’m using Spring Docker Bundler to create the docker images for both producer and consumer.

./mvnw spring-boot:build-image -Dspring-boot.build-image.imageName=kaysush/keda-kafka-producer:0.1
./mvnw spring-boot:build-image -Dspring-boot.build-image.imageName=kaysush/keda-kafka-consumer:0.1

These images are also available on docker hub if you wish to re-use them instead of creating your own. I have made these images configurable via environment variables and secrets which you can define while deploying to Kubernetes.

Deploying Kafka Consumer and KEDA resources

Once we have docker images for both our consumer and producer ready, we’ll deploy our consumer to our K8S cluster.

We’ll deploy our consumer as a Deployment resource. You can find the definition for that below

apiVersion: apps/v1
kind: Deployment
metadata:
name: consumer-deployment
labels:
app: consumer
spec:
replicas: 1
selector:
matchLabels:
app: consumer
template:
metadata:
labels:
app: consumer
spec:
containers:
- name: consumer
image: kaysush/keda-kafka-consumer:0.1
resources:
requests:
memory: "1Gi"
cpu: "500m"
limits:
memory: "2Gi"
cpu: "1000m"
env:
- name: SR_API_KEY
valueFrom:
secretKeyRef:
name: api-secrets
key: api_key
- name: SR_API_SECRET
valueFrom:
secretKeyRef:
name: api-secrets
key: api_secret
- name: KAFKA_TOPIC
value: "keda-demo-topic"
- name: BOOTSTRAP_SERVER
valueFrom:
secretKeyRef:
name: api-secrets
key: bootstrap_server

You can see that it references a secret named api-secrets . This secret has following 3 keys

  • api_key — The confluent API Key
  • api_secret — The confluent API secret
  • bootstrap_server — Bootstrap server of confluent kafka cluster .

You can create this secret with following command

kubectl create secret generic api-secrets --from-literal=api_key=<YOUR-API-KEY> --from-literal=api_secret=<YOUR-API-SECRET> --from-literal=bootstrap_server=<YOUR-BOOTSTRAP-SERVER>

If you have been following this demo as is, you’ll have all of these values available, if not then you’ll have to tweak your consumer according to your own setup.

Next thing we’ll define is KEDA ScaledObject which actually is responsible for auto scaling our consumer.

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
name: kafka-scaledobject
namespace: default
labels:
deploymentName: consumer-deployment
spec:
scaleTargetRef:
name: consumer-deployment
pollingInterval: 5
minReplicaCount: 1
maxReplicaCount: 10 # Max can go upto num of partitions you have in the topic
triggers:
- type: kafka
metadata:
consumerGroup: my-kafka-consumer-group
bootstrapServers: <YOUR-BROKER-HOST>
topic: keda-demo-topic
lagThreshold: "10"
offsetRestPolicy: latest
sasl: plaintext
tls: enable
authenticationRef:
name: keda-kafka-credentials

This scaled object tries to scale the a deployment named consumer-deployment by polling the lag every pollingInterval seconds. We have declared the scaling limit between 1 to 10 as we have 10 partitions in our topic and thats the max any consumer group can scale.

Now KEDA needs to connect to our cluster as well to figure out the lag, hence it needs the credentials and bootstrap server information. We’ll create another KEDA resource called TriggerAuthentication which is also referred in above resource under authenticationRef

apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
name: keda-kafka-credentials
namespace: default
spec:
secretTargetRef:

- parameter: username
name: api-secrets
key: api_key
- parameter: password
name: api-secrets
key: api_secret

This resource will supply the keys from secret to KEDA ScaledObject

Now let us deploy all of these 3 resources to our cluster using kubectl apply -f . If you have KEDA configured correctly you’ll see a log of successful creation of all 3 resources (Deployment, ScaledObject and TriggerAuthentication).

You can verify that your consumer have been correctly started by looking at its logs using kubectl logs -f <your-pod-name> .

You can also verify that KEDA ScaledObject has been instantiated correctly by checking if any HPA resource is created.

HPA resource created by ScaledObject

Seeing it all in action

Now that our Consumer and ScaledObject (and consequently HPA) is configured, we’ll start publishing message to our topic and see if it eventually scales. To simulate consumer lag, our consumer adds a sleep after consuming every message. The delay of the sleep is received as message which is produced by our producer. Each delay value is between [0,100).

Once you have both producer and consumer running for some time, you can check if KEDA has scaled your consumer.

There are couple of ways you can check it.

  1. You can check how many consumer pods are running.
kubectl get pods
Consumer pods scaled to 4

2. Another way to check is to inspect the HPA object.

kubectl describe hpa keda-hpa-kafka-scaledobject
HPA logs showing deployment scaled to 4 instances.

Now let us check if KEDA can scale back our deployment or not. To simulate this let us stop our producer and wait. Eventually the HPA should realise that there is no lag and should scale back our deployment back to 1. There is also an option to scale back to 0 but we haven’t enabled that for this demo.

HPA logs showing deployment scaled to 1 instances.

Rightfully our consumer was scaled back to 1 pod when producer stopped producing any new messages.

Exploring metrics calcualted by KEDA

You can also check what metrics KEDA is generating for our HPA to decide on scaling decisions.

You can get list of all metrics exposed by KEDA using following command

kubectl get --raw "/apis/external.metrics.k8s.io/v1beta1"

This will give you a reponse with list of all extenral metrics.

{
"kind": "APIResourceList",
"apiVersion": "v1",
"groupVersion": "external.metrics.k8s.io/v1beta1",
"resources": [{
"name": "s0-kafka-keda-demo-topic",
"singularName": "",
"namespaced": true,
"kind": "ExternalMetricValueList",
"verbs": ["get"]
}]
}

In order to get the value of metrics calculated by KEDA using following command.

kubectl get --raw "/apis/external.metrics.k8s.io/v1beta1/namespaces/sample-ns/s1-rabbitmq-queueName2?labelSelector=scaledobject.keda.sh%2Fname%3D{ScaledObjectName}"

This will return you the current value of the metric.

{
"kind": "ExternalMetricValueList",
"apiVersion": "external.metrics.k8s.io/v1beta1",
"metadata": {},
"items": [{
"metricName": "s0-kafka-keda-demo-topic",
"metricLabels": null,
"timestamp": "2023-06-02T13:23:27Z",
"value": "0"
}]
}

And that’s how KEDA can scale your consumers up and down based on consumer lag.

If you find any bug in the code or have any question in general, feel free to drop a comment below.

Also don’t forget to delete all of your cloud resource otherwise you might get a shock of huge bill 😅

Till then happy coding :)

--

--

Sushil Kumar
Google Cloud - Community

A polyglot developer with a knack for Distributed systems, Cloud and automation.