Auto Scaling Apache Flink Pipelines using Kubernetes, Flinkoperator, and HPA

Krishna Arava
6 min readAug 16, 2021

--

Kubernetes is an open-source container-orchestration system for automating computer application deployment, scaling, and management. It was originally designed by Google and is now maintained by the Cloud Native Computing Foundation. Kubernetes adoption has grown tremendously and is slowly becoming the deployment platform for the cloud.

This blog post will present a use case for scaling Apache Flink Applications using Kubernetes, Lyft Flinkoperator, and Horizontal Pod Autoscaler(HPA).

Before we begin, I will briefly talk about the components and concepts used. If you are not familiar with Kubernetes I recommend starting here

Introduction to Lyft Operator

Flinkk8sOperator is an open-source Kubernetes operator from Lyft (https://github.com/lyft/flinkk8soperator) that manages Apache Flink applications on Kubernetes. The operator acts as a control plane to manage the complete deployment lifecycle of a Flink application.

The Flink operator aims to abstract out the complexity of hosting, configuring, managing, and operating Flink clusters from application developers. It achieves this by extending any Kubernetes cluster using custom resources. Below is an example of wordcount FlinkApplication custom resource.

apiVersion: flink.k8s.io/v1beta1
kind: FlinkApplication
metadata:
name: wordcount-operator-example
namespace: flink-operator
annotations:
labels:
environment: development
spec:
image: docker.io/lyft/wordcount-operator-example:{sha}
deleteMode: None
flinkConfig:
taskmanager.heap.size: 200
taskmanager.network.memory.fraction: 0.1
taskmanager.network.memory.min: 10m
state.backend.fs.checkpointdir: file:///checkpoints/flink/checkpoints
state.checkpoints.dir: file:///checkpoints/flink/externalized-checkpoints
state.savepoints.dir: file:///checkpoints/flink/savepoints
web.upload.dir: /opt/flink
jobManagerConfig:
resources:
requests:
memory: "200Mi"
cpu: "0.1"
replicas: 1
taskManagerConfig:
taskSlots: 2
resources:
requests:
memory: "200Mi"
cpu: "0.1"
flinkVersion: "1.8"
jarName: "wordcount-operator-example-1.0.0-SNAPSHOT.jar"
parallelism: 3
entryClass: "org.apache.flink.WordCount"

When the above FlinkApplication Custom Resource is installed to a Kubernetes cluster, the operator sets it in motion and takes the following steps. At every step, it updates the current state to CR.

  1. Operator bootstraps Flink Cluster by creating Job Manager and Task Manager pods.
  2. When the Flink Cluster is ready, the operator performs job submit using the Job Manager Rest APIs. If the Cluster fails to start, the operator updates the application status to deploy failed.

Auto Scaling in Kubernetes using HPA

The Horizontal Pod Autoscaler (HPA) automatically scales the number of Pods in a replication controller, deployment, replica set, or stateful set based on observed CPU/memory utilization (or with custom metrics support on some Prometheus application-provided metrics).

The Horizontal Pod Autoscaler is implemented as a controller and Kubernetes API resource. The controller periodically queries the metrics server and adjusts the number of replicas in a replication controller or deployment to match the observed average CPU/memory utilization to the target specified by the user.

  1. Understanding how HPA works
  • During each period, the controller queries the per-pod resource metrics (like CPU) from the metrics server (running in Kubernetes Control Plane) against the metrics specified in each HPA resource.
  • The controller then takes the mean of the utilization across all targeted pods and produces a ratio used to scale the number of desired replicas.

From the most basic perspective, the Horizontal Pod Autoscaler controller operates on the ratio between desired metric value and current metric value:

desiredReplicas = ceil[currentReplicas * ( currentMetricValue / desiredMetricValue )]

2. HPA Example

Now let's look at an example of the HPA definition

apiVersion: autoscaling/v1
kind: HorizontalPodAutoscaler
labels:
app: foo
version: 1.0.0
name: foo
namespace: default
spec:
maxReplicas: 4
minReplicas: 1
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: foo
targetCPUUtilizationPercentage: 50

The foo application was deployed to a Kubernetes cluster and initially had 2 replicas or pods. It has additional traffic at 9 AM in the morning and its CPU usage across all pods increased to 80%. Kubernetes HPA controller now calculates desired number of replicas as below.

  • ceil(80⁄50 * 2) = 4 (Desired Replicas)
  • Additional Replicas to be provisioned -> 4–2 = 2 (Desired Replicas — Current Replicas)

HPA Controller changes the desired replica set of foo application to 4 and another 2 pods are provisioned on the cluster to process the additional traffic.

And at midnight 12 AM the utilization of foo application decreased to 20%. Kubernetes HPA controller now calculates desired number of replicas as below.

  • ceil(20⁄50 * 4) = 2 (Desired Replicas)
  • Replicas to be terminated -> 2–4 = -2 (Desired Replicas — Current Replicas)

HPA Controller now changes the desired replica set to 2 and 2 pods are terminated.

If you want to understand the complete autoscaling algorithm details please read here

Auto Scaling Flink pipelines using HPA

As explained in the previous section HPA can only with Kubernetes Deployment, Replica Set, and Stateful Set because these resources have subresource called “Scale”.

Scale subresource is API endpoint using which HPA controller modifies the number of pods in Deployment, Replica Set, and Stateful Set.

In order for FlinkApplication Custom Resource to work with HPA, we will add scale subresource that will work with TaskManager pods in Flink Cluster.

subresources:
status: {}
scale:
labelSelectorPath: .status.selector
specReplicasPath: .spec.taskManagerConfig.replicas
statusReplicasPath: .status.clusterStatus.numberOfTaskManagers

Also note that labelSelectorPath needs to be set to match labels of task manager pods. This can be set during the “clusterstarting” phase of the Flink Cluster.

Putting it all together

When a Flink application with HPA definition is installed to the Cluster, the HPA controller will calculate desired TaskManager pods using the below algorithm.

ceil((Current Mean Metric/Threshold Metric) * current TM Pods) = Desired TM Pods

Let's understand this using an example. A pipeline has initial parallelism set to 4, task slots to 2, and the CPU threshold as 40% in HPA. When the pipeline is first deployed, it would come up with two TM Pods (Parallelism/Taskslots i.e. 4 ⁄ 2 = 2).

apiVersion: autoscaling/v1
kind: HorizontalPodAutoscaler
labels:
app: foo
version: 1.0.0
name: foo
namespace: default
spec:
maxReplicas: 4
minReplicas: 1
scaleTargetRef:
apiVersion: flink.k8s.io/v1beta1
kind: FlinkApplication
name: foo
targetCPUUtilizationPercentage: 40

There is now a spike in traffic to Kafka topic that pipeline was reading and CPU usage across all TM pods increased to 80%. Kubernetes HPA controller which reconciles periodically now calculates desired TM Pods as illustrated below.

ceil(80⁄40 * 2) = 4 (Desired TM Pods)

The HPA controller updates the FlinkApplication Custom Resource of the Flink Application to have 4 TM Pods. And when Flink Operator sees replicas are modified, it will create a new Flink cluster with 4 Task Managers Pods. Also Flink Operator updates the parallelism of pipeline to 8 = 4(TM Pods) * 2(taskslots) during “clusterstarting” phase.

Conclusion

In this post, I showed how to put together incredibly powerful patterns in Kubernetes — HPA, Operator, Custom Resources to scale a distributed Apache Flink Application. For all the criticism of complexity in Kubernetes, use cases like these really showcase where Kubernetes really shines.

Kubernetes is a powerful platform and provides all the nuts and bolts to build powerful services.

This solution can also be integrated with custom metrics from Prometheus or using KEDA operator (https://github.com/kedacore/keda).

While this solution can be deployed to any Managed Kubernetes Cluster, we did encounter weird issues with HPA in the AWS EKS cluster where it refused to increase the number of pods. Turned out the AWS EKS cluster refuses to upscale unless the AWS Account you are using has the ability to provision compute resources beyond a certain threshold.

In the next blog post, I will present another interesting use case for using Kubernetes to build a generic SAAS service. Stay tuned.

P.S.: We are hiring and if you are interested in working on distributed systems and scaling challenges of analytics platform please share your resume karava AT cisco dot com. You can follow me https://twitter.com/krisharava

--

--