Autoscaling Kafka Consumers using Kubernetes Event Driven Autoscaling (KEDA) — Part 2

akshay agarwal
6 min readFeb 28, 2024

--

In the Part 1 of the series, we have seen about how KEDA works and how to deploy and monitor KEDA in Kubernetes.

In this part of the series we are going to see problem statement and implementation details of KEDA with Kafka Consumers.

Problem Statement

Kafka Consumers are clients which consumes messages from a specific Kafka topic and processes them to perform certain business logic.

Often there are situations where we run consumers, having varied throughput in the topics and often having no messages coming in Kafka topic for longer duration. Thus leading us to below problem statements:

  1. To scale Kafka consumers from 0–1 and from 1–0 based on messages in Kafka topic.
  2. To scale consumers from 1-n and n-1 based on lag metric in Kafka topic.

Understanding Usage of KEDA

Keda as we discussed in part 1 of the series, consists of scaled objects, trigger authentication and several scalers as part of custom resources and managed via Keda Operator, Metrics Server and Admission Controller.

For scaling Kafka Consumers following scalers can be used:

Kafka Scaler: This scaler takes Kafka brokers, consumer group id, topics, authentication information, etc and connects to Kafka cluster getting the kafka lag metric based on the consumer group specified in the trigger.

metadata:
activationLagThreshold: "1"
bootstrapServers: kafka.bootstrap.cluster:9092
consumerGroup: kafka-test-consumer-group
lagThreshold: "100000000"
offsetResetPolicy: latest
scaleToZeroOnInvalidOffset: "false"
tls: enable
topic: kafka-test-topic
type: kafka
  • activationLagThreshold: The lag threshold metric at which KEDA operator activates/de-activates KEDA scaledobject. Value of 1 ensures that KEDA activates scaler immediately even after a single message enters Kafka topic.
  • bootstrapServers: Kafka Cluster servers to connect for the metric. Ensure there is connectivity to Kafka cluster from the Kubernetes cluster where the scaler is deployed.
  • consumerGroup: Consumer Group ID for which scaler needs to compute the lag metric.
  • lagThreshold: Lag threshold at which scaler will scale up underlying target from 1 to n or n to 1. Higher value of `lagThreshold` would ensure we do not consider lag metric for scaling beyond activation/de-activation. We will see reason for the same in the series.
  • scaleToZeroOnInvalidOffset: This value represents what scaler needs to do when it sees no valid offset committed for the consumer group configured as part of scaler object. Value of “false” would mean scaler would not scale scaler to 0 when it sees no input messages in topic at start.
  • TLS: TLS as enable is passed if the kafka cluster being connected is TLS enabled. On passing this parameter we would also need to pass in the authentication information to scaler trigger which we will see below.
  • Topic: Kafka topic for which lag is to be computed. If not specified all topics are considered for which consumer group specified is subscribed to.

Prometheus Scaler: This scaler takes the query and prometheus endpoint and returns the metric from prometheus to underlying HPA.

- metadata:
activationThreshold: "1000000"
customHeaders: x-scope-orgid=test
metricName: lag_to_throughput_ratio
namespace: <metrics_server_namespace>
query: lag_to_throughput_ratio{group_id="kafka-test-consumer-group"}
serverAddress: <prometheus_server_address>
threshold: "10"
type: prometheus
  • activationThreshold: Higher activation threshold ensures we scale up from 0–1 or 1–0 using Kafka scaler trigger. Since there would be no metric when deployment is scaled down to 0, activation threshold of higher value helps in ignoring the trigger for activation.
  • customHeaders: If cortex is the source of metric and metrics are stored per tenant, pass `X-SCOPE-ORGID` header with tenant name as value.
  • metricName: In this field we need to pass the name of the metric being fetched from prometheus. We can use a custom metric based on lag in kafka topic and topic throughput which we will see below.
  • namespace: A namespace that should be used for Namespaced queries. These are required by some highly available Prometheus setups, such as Thanos, Cortex, etc.
  • query: PromQL query to be evaluated for fetching metrics from prometheus.
  • serverAddress: Address of the prometheus or some highly available Prometheus setups, such as Thanos, Cortex, etc.
  • threshold: Threshold value at which underlying HPA would scale target from 1-n and n-1. This is for the lag processing time metric being fetched from prometheus.

Combining above 2 scaler triggers with KEDA scaled object helps in Kafka consumer activation/de-activation using Kafka Scaler and scale from 1-n or n-1 based on underlying HPA using Kafka consumer group lag metric in combination with throughput in topic.

Metrics

Lag processing time:

sum(label_replace(kminion_kafka_consumer_group_topic_lag{group_id =~".*-test.*"})) by (group_id) 
/ on(group_id)
clamp_min(sum(rate(global_messages_total_value{scope="CONSUMER", consumer_group_id != ""}[15m])) by (consumer_group_id),1)

Kafka Consumer Group Lag:

Total lag = sum(lag of each partition (log end offset-current offset) in mentioned topic).

Trigger Authentication:

Often there are different types of authentication information needs to be passed to scaler to connect with the monitoring source. In case of Kafka, this can be either user certs and key along with CA cert or also can be truststore and keystore JKS files. It can also be username and password in case of SASL enabled Kafka cluster.

---
apiVersion: keda.sh/v1alpha1
kind: TriggerAuthentication
metadata:
name: kafka-test-consumer
namespace: test
spec:
secretTargetRef:
- parameter: username
name: kafka-test-consumer-creds
key: username
- parameter: password
name: kafka-test-consumer-creds
key: password
- parameter: ca
name: kafka-test-consumer-creds
key: ca
- parameter: cert
name: kafka-test-consumer-creds
key: cert
- parameter: key
name: kafka-test-consumer-creds
key: key

The name of above object needs to be passed as a reference under the triggers

triggers:
- authenticationRefauthenticationRef:
name: kafka-test-consumer
metadata: {}
type: kafka

Other Params:

cooldownPeriod: Amount of time to wait before triggering scale down after the triggers output results in scale down event.

fallback:

  • failureThreshold: Number of failures of scaler which triggers fallback.
  • replicas: Number of replicas target needs to be scaled to in case of fallback being triggered.

maxReplicaCount: Maximum count of replicas that the target can be scaled when metrics results in scale up.

minReplicaCount: Minimum count of replicas that the target is always scaled up.

pollingInterval: Time interval in seconds between which trigger fetches and evaluates metrics for scale up/down or activation and de-activation.

scaleTargetRef:

  • name: Name of the target deployment.

HPA advanced behaviour

In addition to above, we can also configure advanced HPA configurations which are supported from 1.18+ version of Kubernetes. These configurations are passed directly to HPA from KEDA Scaler object.

It supports,

  1. Defining the Stabilization window for HPA (scale from 1-n and n-1).
  2. Defining policies for scale up / down which will be highly useful especially for Kafka consumers to avoid lag due to re-balancing and hence avoid thrashing for latency critical consumers. Idea is to scale up fast and scale down slowly. Define the period after which action needs to be taken based on policy specified. Also requires to define percentage or number of pods to be considered while downscaling.
  3. HPA policy can be used in conjunction with Co-operative Sticky Assignor of Kafka consumers to reduce spikes in lag metric.

ScaledObject

Complete scaled object for KEDA for autoscaling Kafka Consumers.

apiVersion: keda.sh/v1alpha1
kind: ScaledObject
metadata:
labels:
scaledobject.keda.sh/name: kafka-test-consumer-scaled-object
name: kafka-test-consumer-scaled-object
namespace: test
spec:
cooldownPeriod: 300
fallback:
failureThreshold: 3
replicas: 1
advanced:
horizontalPodAutoscalerConfig:
behavior:
scaleUp:
policies:
- type: Percent
value: <scaleup_percent>
periodSeconds: <scaleup_period_secs>
scaleDown:
policies:
- periodSeconds: <scaledown_period_seconds>
type: <Percent/Pods>
value: <scaledown_percent>
stabilizationWindowSeconds: <scaledown_stabilization_window_secs>
name: "keda-hpa-kafka-test-consumer-scaled-object"
maxReplicaCount: 2
minReplicaCount: 1
pollingInterval: 30
scaleTargetRef:
name: kafka-test-consumer
triggers:
- authenticationRef:
name: kafka-test-consumer
metadata:
activationLagThreshold: "1"
bootstrapServers: kafka.bootstrap.cluster:9092
consumerGroup: kafka-test-consumer-group
lagThreshold: "1000000"
offsetResetPolicy: latest
scaleToZeroOnInvalidOffset: "false"
tls: enable
topic: kafka-test-topic
type: kafka
- metadata:
activationThreshold: "1000000"
customHeaders: x-scope-orgid=test
metricName: lag_processing_time
namespace: <metrics_server_namespace>
query: lag_processing_time{group_id="kafka-test-consumer-group"}
serverAddress: <prometheus_server_address>
threshold: "10"
type: prometheus

Conclusion

In this part of the series, we have seen different scaler objects with KEDA and implementation details for the autoscaling for Kafka Consumers. We have also seen metrics considered for scaling the target.

In the next part of the series, we are going to see what were the challenges faced during the implementation of above scaler and what are the possible actions that we can take for the same. Let’s see you all in the next article :)

Please keep looking for more in the series.

You can follow me on LinkedIn and Medium for more such articles. Happy Learning!!!

--

--

akshay agarwal

Data Engineer | Infrastructure | Ardent Reader | Passionate Developer