(Airflow-Kubernetes) We attempt to simulate real-time data ingestion using high frequency batch ingestion. Here’s what happened…

Andy Yan
99.co
Published in
7 min readJan 5, 2023

In this article, we will be sharing how we attempt to schedule our pipelines more frequently to achieve ‘pseudo real-time’ data ingestion and the issues that came along with it. For context, our Airflow instance is deployed on a Kubernetes Cluster, and we talked about it in detailed here.

The Problem

As more of our stakeholders build their workflow around reports powered by our data, one of the common requests is to have the data ingested more frequently, so that they could have the data as up-to-date as possible. This is especially important for workflows that are time-sensitive in nature. However, our data engineering system is built for batch ingestion, which is not ideal the ideal technology here. As a workaround to meet this requirement, some of our pipelines are scheduled to ingest data at a 15-minute interval.

While we were on a search for an appropriate data streaming solution, the requests for these high frequency pipelines piled up. Eventually, the performance of these pipelines degraded, and they couldn’t finish the job within the 15-minute interval. Most of the time were spent by these tasks waiting for their turn to run. This has a cascading effect, affecting downstream tasks and failed subsequent runs. We deep dived into the system and observed the following:

Number of task instances running (i.e. pods running) over time. This graph includes task instances from both high and low frequency pipelines. At 06:30, most pipelines will attempt to run.

As of then, some of our key configurations and observations are as follows:

  • Airflow’s paralellism is set to 64, while the graph shows that at any time, there’re less than 30 task instances running. Number of concurrent task instances allowed to run is not at its limit yet.
  • Maximum nodes in the node pool in the Kubernetes Cluster is set to 6, while at any time during this period, the number of active nodes are at 4 at most. This means that there are available resources to run any of the enqueued tasks, but they are not being utilised.
  • Most of the time are spent by the tasks waiting to run. Once they start running, their time taken to complete are the same as before. This means that resources provided for the tasks are also not an issue.
  • The Airflow’s Scheduler instance is also not maxing out the amount of resource allocated to it.

From these observations, we can safely conclude that resources allocated to the system as a whole are not an issue. The Scheduler is likely the bottleneck here.

The Configuration

Upon further investigation, we realized one of our key KubernetesExecutor configuration — worker_pods_creation_batch_size is set to the default value of 1. According to the documentation, worker_pods_creation_batch_size is the number of Kubernetes Worker Pod creation calls per scheduler loop. This means that during any scheduler loop, there’s only one worker pod created to run an enqueued task instance. The throughput of the Scheduler passing the tasks to the KubernetesExecutor is too low, and this could be the key to resolving the bottleneck.

We adjusted the worker_pods_creation_batch_size to 2, and then 4 and observed the following:

`worker_pods_creation_batch_size` is set to 2. Number of pods running concurrently goes up to 30+.
`worker_pods_creation_batch_size` is set to 4. Number of pods running concurrently goes up to 70+.

As we can see, at 06:30 when Airflow is attempting to run most of the tasks, the number of task instances running at any given time was able to go as high as 70. More nodes were spawned to meet the resource requirements, and the high frequency pipelines were able to complete in time. Everything seemed to be hunky-dory… and then, there’s this other issue.

The Other Issue

Right after the changes we made, the main Airflow deployment, which consists of both the Webserver and the Scheduler instances, started to have this odd behaviour of restarting itself every now and then (as frequent as every 15 to 30 minutes, especially during high workload period). While it didn’t break our system completely, it did introduce some disruptions to the team, since there’s always a lead time before the Webserver and Scheduler completely spins up. With our monitoring tools in place, we observed the following:

  • There seemed to be a smooth transition during the restarting of the Webserver and Scheduler instances. The old instances will always wait for the new instances to be ready before being shut down.
  • Scheduler logs didn’t state anything except “exiting gracefully upon receiving signal 15”. The Scheduler was running fine and was likely terminated by an external agent.
  • The Webserver and Scheduler instances were restarted only when they are the only pods running in a node. The new instances are deployed onto another node.

We suspected that it is likely the Cluster Autoscaler that is restarting our Airflow instance onto another existing nodes in order to utilize any existing nodes efficiently.

The Cluster Autoscaler

So this begged the question: Why did the Cluster Autoscaler so aggressively restart our Airflow instances onto other nodes? This hadn’t happen before. So why increasing the number of worker pods created per loop will result in this behaviour?

According to this FAQ, the condition for the Cluster Autoscaler to scale down a particular node are as follow:

  • The sum of CPU and memory requests of all pods running on this node is smaller than 50% of the node’s allocatable. Utilization threshold can be configured.
  • All pods running on the node can be moved to other nodes.
  • It doesn’t have scale-down disabled annotation.

Furthermore, our Kubernetes Cluster is deployed using GKE with the following autoscaling profileoptimize-utilization. With this profile enabled, the Cluster Autoscaler ‘prioritize optimizing utilization over keeping spare resources in the cluster. When selected, the cluster autoscaler scales down the cluster more aggressively: it can remove more nodes, and remove nodes faster.’ This meant that Cluster Autoscaler constantly look out for opportunities to redeploy any pods in any underutilized nodes to those that are already sufficiently utilised, such that the total number of active nodes required to run the workload (considering the resources requested) can be kept to a minimum.

Given the understanding above, our hypothesis of what had transpired can be summarized as follows:

When worker_pods_creation_batch_size was set to 1, the rate of worker pods created on the cluster is low (low throughput). As a result, new nodes were spawning slower. The rate of worker pods completion was also slower, resources in the nodes freed up slower. There were little to no opportunities for Cluster Autoscaler to consider re-deploying pods to other nodes to maximize utilization of nodes.

When worker_pods_creation_batch_size was set to 4, the rate of worker pods created on the cluster is high (high throughput). As a result, new nodes were spun up quicker. When a batch of worker pods in a particular node completed at the same time, this leaves that particular node heavily under-utilized. If the Airflow instances are found in this same node, Cluster Autoscaler has to re-deploy them to other sufficiently utilised node. Hence, this caused the frequent redeployments, and therefore the restarts.

To further illustrate, let’s assume the following in this example:

  • Only CPU resource requests are considered for scaling up and scaling down
  • Airflow pods request 1.5 CPU in total and worker pods request 1.0 CPU each
  • A node have a total allocatable CPU of 4.0
  • All worker pods have the same completion time

When worker_pods_creation_batch_size was set to 1:

When worker_pods_creation_batch_size was set to 4:

The Anchor Tenant

To resolve this constant redeployment of our Airflow instances, we would have to violate one of the scale-down conditions. Suggested from the same FAQ, we have to annotate the pods of the Airflow instances with the following:

"cluster-autoscaler.kubernetes.io/safe-to-evict": "false"

This violates the second condition of the aforementioned scale-down conditions. Whichever node the pods of the Airflow instances ended up in, there were not allowed to be re-deployed to other nodes by Cluster Autoscaler. It acts as an ‘anchor tenant’ that keeps the node from scaling down by the aggressive Cluster Autoscaler.

While this might mean a slightly less efficient utilization of our allocated nodes, having a stable state of our Airflow instances is well worth it.

The Lessons Learnt

In summary, here’s what we have learnt:

  • The default value for worker_pods_creation_batch_size of 1 is insufficient to fully take advantage of the available resources allocated. The value should be slowly adjusted upwards to find the optimal point where it takes advantage of all resources available, while doesn’t overwhelm the Kubernetes Cluster.
  • Cluster Autoscaler is aggressive when it comes to optimizing the number of active nodes needed to handle all the workloads. It will attempt to keep the number of active nodes to a minimum.
  • High rate of worker pods deploying and completing at the same time results in some nodes being under-utilized, easily giving rise to the right conditions for Cluster Autoscaler to scale down said nodes. This results in frequent re-deployment of pods that are supposed to run indefinitely (i.e. Airflow instances).

All the aforementioned issues came about when we tried to simulate real-time data ingestion using technologies built for batch ingestion. While these changes we made helped stabilize our system, we are still pursuing a proper real-time data streaming solution.

This was initially written as our post-mortem for internal discussion by Ridho Fadillah. The post-mortem was subsequently reviewed and edited into what became of this blog post.

--

--