Apache Airflow on Kubernetes at scale — a peak under the hood

Piyush Dewnani
7 min readNov 14, 2022

--

This article aims to provide insights into running Apache Airflow on Kubernetes and sharing the revamped deployment steps carried over the past few years to make Airflow cluster operations more robust along with keeping it cost-efficient (read finops). This article adds further optimizations done on top of previously shared Adobe Experience Platform’s Setup for Apache Airflow on Kubernetes

Apache Airflow with Kubernetes executor mode does offer an interesting way to horizontally scale-out workflow/task execution. However, the team here observed and overcome quite a few challenges to ensure a hardened deployment on Kubernetes.

Custom storage class based on FUSE

One of key challenges to overcome with deploying Apache Airflow in Kubernetes executor mode is to ensure each individual task instance being run as a separate Kubernetes pod has an identical workflow snapshot available locally.

To achieve the above objective the team initially leveraged default implementation of making Airflow’s dag-store available as ‘read-many’ k8s persistent volume mount to all of the worker pods. However, this led to an additional overhead on the underlying network-attached-storage layer with the cloud-provider starting to throttle requests to storage-unit due to increased IO operations, this essentially manifested as K8s runtime issue with Pod Lifecycle Event Generator (PLEG) as number of worker pods mounting/unmounting the same persistent volume increased with-in a short duration.

We explored the option of leveraging File system in user-space — FUSE for the centralized dag-store and for exposing the FUSE mount as a persistent volume we opted for using a specialized Kubernetes CSI driver. This ensured that for each worker pods only the specific dag/sub-dag file that needs to be executed gets pulled in the container using a user-space https (Blob) access call avoiding mounting of all dags as a shared volume. Also FUSE container were deployed as a node-controller (daemon-set) on each node to aid efficient caching of dag files and further reduce back pressure on storage layer due to frequent mount/unmount operations. This exercise also led to significant cost-saving due to moving away from expensive NAS files offering from the cloud provider, leveraging Blob offering where only specific files were pulled in on-demand.

The above diagram depicts a general overview of CSI controller. The actual implementation of a CSI driver is internal to storage provider. Reference to community driver guidelines and persistent volume lifecycle is available here

This exercise also involved introducing a custom storage class to seamlessly integrate with the cloud provider (Azure) offering.

apiVersion: storage.k8s.io/v1
kind: StorageClass
metadata:
name: blob
provisioner: blob.csi.azure.com
parameters:
skuName: Standard_LRS
# available values: Standard_LRS/GRS/RAGRS or Premium_LRS
reclaimPolicy: Retain
# if set as "Delete" container would be removed after pvc deletion
volumeBindingMode: Immediate
mountOptions:
- -o allow_other
- - file-cache-timeout-in-seconds=120
- - use-attr-cache=true
- -o attr_timeout=120
- -o entry_timeout=120
- -o negative_timeout=120

Azure Blob also powers the data-lake (ADLS GEN-2 ) offering as the underlying storage layer, so it does qualify as a best fit for our use-case in terms of cost (~0.046 USD per GB) and offering ~20k IOPS in the basic pricing plan and could be scaled upto 100k IOPS if required. In comparison the Azure files offers ~1k IOPS by default with each transaction and network IO being charged separately (this amounted to ~250 USD on one of our moderately loaded cluster)

Furthermore, in our deployment using Azure Blob we did leverage the soft-delete functionality along with point-in-time snapshots for accidental deletions and disaster recovery scenarios at minimal cost for the centralised Airflow dag-file store.

Optimising pod creation

Another caveat acknowledged by the community with Kubernetes executor is the additional overhead of creating a pod for every task execution which has a downside especially in case of short-lived tasks. The following tweaks were rolled-out to minimise overhead in adhoc worker-pod creation.

Optimising docker image layers with multi-stage builds

The docker image used by worker pod was further optimised and multi-stage build (reference) was used to minimize the number of layers in the docker image. This resulted in the container runtime re-using the cached docker image layer and pod spin-up time was improved from ~1 minute to less than 5 seconds.

Ephemeral disk for Kubernetes nodes

Most cloud-providers by default attached a remote OS disk to each K8s node. As number of lightweight, short-lived pods running on same node increases, the disk IOPS during the container start-up and tear-down also increased exponentially. Switching to local disk attached to K8s worker node did provide increased IOPS while lowering the costs and avoiding disk throttling — reference. The throttling of the VM due to the mismatched IaaS resources (VM/Disk vs workload) directly impacts the runtime stability and performance of K8s clusters.

As a comparison 64 GB local SSD on a K8s node would offer 5000 IOPS — similar IOPS on a Network-Attached-Storage (NAS) disk would require us to associate P30 premium SSD disk (1024 GB) on Azure platform.

Using custom networking in Azure to optimise IP address allocation

In Kubernetes by default each node/pod gets a unique IP-address. The team opted for a hosting the Kubernetes node(s) in a dedicated private virtual network which was peer-ed with the enterprise multi-cloud network.

Additionally, each K8s node by default has a limited set of port allocated per node, we had to double the number of SNAT ports allocated per virtual machine (K8s node) on Azure to minimise errors for tasks which were polling specific HTTP(s) endpoint more frequently.

Leveraging Kubernetes cluster auto-scaler

In Kubernetes, the deployed application can leverage multiple options related to scaling. For example:

  • Horizontal pod scaler: Policy/Rules to increase and decrease number of Pod replicas based on CPU/Memory utilization and/or custom metrics
  • Vertical pod scaler: Policy/Rules to increase and decrease container resources based on CPU/Memory utilization and/or custom metrics
  • Scheduled scaler: K8s entity/object enabling horizontal pod scaler functionality based on a specific calendar event

The above solutions work in tandem with the general use-case of a stateless web-service where the application can have multiple replicas and we can leverage a combination of vertical/horizontal scaling.

However for Airflow scheduler in K8s executor mode the application use-case is a bit different and the team zero-ed in on leveraging Kubernetes cluster auto-scaler based on Kubernetes primitive constructs. This did provide us with flexibility and we avoided managing third-party integration like KEDA

In cluster auto-scaler, Kubernetes nodes can be scheduled to capacity so essentially pods can consume all the available capacity on a node by default. We did encounter a caveat here as on Kubernetes, nodes typically run quite a few system daemons that power the OS and Kubernetes itself. Unless resources are set aside for these system daemons, pods and system daemons compete for resources and lead to resource starvation issues on the node.

The kubelet exposes a feature named Node Allocatable that helps to reserve compute resources for system daemons. Kubernetes recommends cluster administrators to configure Node Allocatable based on their workload density on each node.

The allocatable limits is enforced by K8s control-plane as:

[Allocatable] = [Node Capacity] - [Kube-Reserved] - [System-Reserved] - [Hard-Eviction-Threshold]

We opted for fine-tuning the compute resources allocated to each of the pods as well as number of pods that can be run a single node. Along with these primitives the cluster auto-scaler policy from the cloud provider (Azure) were also fine-tuned to align with our deployment.

Guaranteed Kubernetes QoS

The K8s parameters for Airflow worker pods where updated to align being from burstable to guaranteed — reference

spec:
containers:
...
resources:
limits:
cpu: 50m
memory: 80Mi
requests:
cpu: 50m
memory: 80Mi
...
status:
qosClass: Guaranteed

The compute parameters were derived based on the worker nodes capacity along with factors such an optimal number of pods that can be run on a single K8s node, disk IOPS offered for the VM type, number of allocatable ports per node.

Few other optimizations were rolled out to ensure low latency responses for workflow management API calls

  1. Updating network topology to align with private cluster setup on Azure
  2. Redirecting read-only queries to MySQL replica to minimize load on primary instance
  3. Switching to ContainerD K8s runtime to leverage upstream optimisation, this required updates/tweaking in the Splunk-connect module that we were using earlier.
  4. Deploying a separate container to cater to Airflow-UI and to isolate REST API request being served via a separate Airflow server instance.

Overall, the revamped deployment of Airflow in Kubernetes executor mode has been very promising. The exercise led to quite a lot of learning and the team fortified the deployment adopting to Kubernetes world in terms to storage drivers, container runtime updates, network-port translation issues, optimising on load-balancing across Airflow’s administration panel and REST-API endpoints. The journey to deploying Airflow on Kubernetes with varied workloads and newer paradigms continues.

References

1. Adobe Experience Platform — https://www.adobe.com/experience-platform.html

2. Apache Airflow — https://airflow.apache.org/

3. Azure — https://azure.microsoft.com/en-us/

4. Azure Kubernetes Service — https://azure.microsoft.com/en-us/services/kubernetes-service/

5. Blob Storage on Azure — https://azure.microsoft.com/en-us/services/storage/blobs/

6. Kubernetes DaemonSet — https://kubernetes.io/docs/concepts/workloads/controllers/daemonset/

7. Kubernetes Quality of Service — https://kubernetes.io/docs/tasks/configure-pod-container/quality-service-pod/

8. Kubernetes CSI driver — https://kubernetes-csi.github.io/docs/drivers.html

9. Azure Blobfuse driver — https://github.com/kubernetes-sigs/blob-csi-driver

10. Kubernetes cluster Autoscaler — https://github.com/kubernetes/autoscaler/tree/master/cluster-autoscaler

11. ContainerD runtime — https://kubernetes.io/docs/setup/production-environment/container-runtimes/#containerd

12. FinOPS — https://www.finops.org/what-is-finops/

--

--

No responses yet