Enabling Autoscaling in Google Cloud Composer

Supercharge your Cloud Composer deployment while saving up some cost on idle period

Joshua Hendinata
Traveloka Engineering Blog
7 min readOct 8, 2019

--

Image by Arek Socha from Pixabay

Disclaimer: This method has been tested and implemented in composer-1.7.2-airflow-1.10.2 and composer-1.7.5-airflow-1.10.2. YMMV

What is Google Cloud Composer

A fully managed workflow orchestration service built on Apache Airflow

Which is a fairly accurate description (you can read more about it here). The important thing is, Composer uses CeleryExecutor as its task executor and deploys it on Google Kubernetes Engine (GKE). Composer offers you a quick and easy setup for your airflow deployment.

Motivation for Enabling Autoscaling in Composer

Unfortunately, at the time of writing, Composer only supports static node count (with a minimum of 3 nodes) which is not ideal for spiky/long-interval such as daily batch job due to several reasons:

  1. Idle nodes are still running and incur an unnecessary cost
  2. No automatic scaling up when the DAG is highly parallelizable or during backfill.
  3. Although Composer offers a way to adjust the node count manually, it takes up to 5 minutes to finish the process!

To alleviate these issues, we will introduce a way to enable autoscaling in Composer, both on worker level as well as the node level.

Additional Notes:

  1. If you are not using Composer for your airflow deployment, check out KubernetesExecutor which spawns new pod for every task. This way, you only need to turn on the autoscaling at the node level.
  2. Some of you might point out that Celery has its own autoscaler. However, celery autoscaler can only increase/decrease the number of worker_concurrency and hence, will still be limited by its pod resources

Let’s Dive Into the Details!

First, Enable Autoscaling on Node Level

In GKE, enabling autoscaling in Node Level is relatively straight-forward. You only need to know some details regarding the GKE cluster used by the Composer (e.g. its location and zone) to finish this step.

You can turn on the autoscaling from:

  • Web UI console

Go to Composer → Your Composer environment → View Cluster Details.

Click on default-pool → edit → check “Enable Autoscaling” and specify the minimum and maximum nodes (more details here)

  • gcloud CLI
PROJECT=[provide your gcp project id]
COMPOSER_NAME=[provide your composer environment name]
COMPOSER_LOCATION=[provide the selected composer’s location e.g. us-central]
CLUSTER_ZONE=[provide the selected composer’s zone e.g. us-central1-a]
GKE_CLUSTER=$(gcloud composer environments describe \
${COMPOSER_NAME} \
--location ${COMPOSER_LOCATION} \
--format="value(config.gkeCluster)" \
--project ${PROJECT} | \
grep -o '[^\/]*$')
gcloud container clusters update ${GKE_CLUSTER} --enable-autoscaling \
--min-nodes 1 \
--max-nodes 10 \
--zone ${CLUSTER_ZONE} \
--node-pool=default-pool \
--project ${PROJECT}

In this case, we can set the minimum node to 1 as to minimize the cost during idle period. However, for maximum nodes, there are some calculations involving the desired number of concurrency, type of machine, and pod resource requests. The details will be explained below.

Secondly, Enable Autoscaling on Worker Level

For worker level, we are going to apply Kubernetes’ Horizontal Pod Autoscaler (HPA) to airflow-worker Deployment in Composer.

Before we apply the HPA, we need to take note that by default, airflow-worker Deployment did not specify any resource request (i.e. BestEffort Pod QoS). Hence, if we apply the HPA, once the node resource usage reaches its limit and the Deployment keeps scaling up, some of the pods will be evicted and any running task execution in that pod will be terminated.

To avoid the pods from being evicted, we need to change its QoS to Guaranteed by specifying the resource request and limit.

Resource Request Calculation

Currently, there are 2 containers in airflow-worker Deployment: airflow-worker and gcs-syncd.

Initial number for calculation:

  1. Composer’s default celery worker_concurrency: 6 → this takes ~400 MB of memory
  2. A simple BashOperator or KubernetesPodOperator takes 150–200 MB of memory

Calculation for airflow-worker resource request:

  1. Total memory usage if the pod runs 6 concurrent tasks → 6 * (150–200 MB) + 400 MB (celery worker) = 1.3–1.6 GB.
  2. For safety, lets use 2 GB memory request and limit

You might adjust the memory or CPU request based on your task workload. But the goal is to specify an upper bound on the resource request so that airflow-worker pods won’t ever be evicted.

Calculation for gcs-syncd resource request:

gcs-syncd on average uses less than 100 MB memory but it can burst up if there’s lots of file changes (e.g. during deployments). Based on our experience, the safe memory request is 512 MB

Applying Resource Request Modification to Kubernetes

You can apply both of these modifications using kubectl CLI by creating the composer_airflow_worker_patch.yaml file below:

and run:

gcloud container clusters get-credentials ${GKE_CLUSTER} \
--zone ${CLUSTER_ZONE} --project ${PROJECT}
AIRFLOW_WORKER_NS=$(kubectl get namespaces \
| grep composer | cut -d ' ' -f1)
kubectl patch deployment airflow-worker -n ${AIRFLOW_WORKER_NS} --patch "$(cat composer_airflow_worker_patch.yaml)"

After this, you can apply the HPA to the airflow-worker Deployment by creating the composer_airflow_worker_hpa.yaml file below:

and run:

kubectl apply -f composer_airflow_worker_hpa.yaml

More Explanation on the HPA Configuration

For maxReplicas number, you can adjust it based on your use case and project quotas if you are using any GCP services. Theoretically, you can run maxReplicas * worker_concurrency concurrent tasks (in this case, 50 * 6 = 300 concurrent tasks)

For HPA Metrics specification, you can customize it according to your use case. In our use case, we want to maximize the node usage. Hence, we want to scale up only when all available airflow-worker pods has reached its maximum concurrent tasks (i.e. 6 tasks per pod). Since all of our tasks are memory-bound, we opt to use memory as the indicator.

Based on observation, each of our task uses around 200 MB of memory. Hence, it will use 1.6 GB during full capacity (400 MB for celery worker + 6 tasks * 200 MB). In the end, we use 1.5 GB as a threshold for a more aggressive scale up. We can ignore gcs-syncd since it has low memory usage most of the time.

When scaling down, airflow-worker pod has a preStop hook that ensures each worker has finished whatever task it is currently performing before killing it. So you do not need to worry about task being terminated half way through the process.

For more info and additional metrics available for Kubernetes HPA, see here

(Optional) Based on the description here, there are some improvement on pod scale up delay algorithm on Kubernetes 1.12. Hence if your Composer kubernetes master/node-pools uses a lesser version, it is recommended to upgrade it to at least 1.12. Personally I have seen an improvement in the scale up delay from ~5 minutes in version <1.12 to ~2.5 minutes in version >=1.12

Thirdly, Remove the Bottleneck from Airflow Configuration

Now that we have enabled autoscaling in the underlying Kubernetes cluster, we need to modify certain airflow parameters so it does not become a bottleneck in the autoscaling process

  1. Increase parallelism to 300 or whatever maximum concurrent tasks achievable (i.e. maxReplicas * worker_concurrency)
  2. Increase dag_concurrency to be the same as parallelism so 1 DAG can enjoy the full benefit of the autoscale
  3. Increase max_active_runs_per_dag to any high number so not to be a bottleneck in your DAG.
  4. Increase dagbag_import_timeout to 120. Based on our experience, sometimes airflow can raise a timeout error if there are too many tasks running concurrently. This setting seems to fix the issue

You can modify the airflow configuration using gcloud command below:

gcloud composer environments update $COMPOSER_NAME \
--update-airflow-configs=core-max_active_runs_per_dag=150 \
--update-airflow-configs=core-dag_concurrency=300 \
--update-airflow-configs=core-dagbag_import_timeout=120 \
--update-airflow-configs=core-parallelism=300 \
--location $COMPOSER_LOCATION \
--project $PROJECT

Additional Consideration

The default machine type for Composer is n1-standard-1 with 1 CPU and 3.75 GB. In our use case, since our task is memory-bound and each airflow-worker pod requests 2.5 GB of memory, we opt to use a machine with memory in multiples of 2.5 GB to maximize the usage (in this case n1-standard-4 with 15 GB of memory).

Using this calculation, each node can contain a maximum of 6 worker pods (15 / 2.5). So to accommodate 50 pod replicas (from maxReplicas in HPA YAML), you need a maximum of 9 nodes (9 * 6 > 50). You can use this formula to determine the max-nodes parameter when turning on the autoscaling on node level.

Remember that this method will still leave at least 1 node in default-pool. So think about the idle time cost of the machine vs. how many pods can it contain since scaling up pods is much faster than scaling up nodes

Putting it all together

I created a small gist to combine all of the script above here. Make sure to put composer_airflow_worker_patch.yaml and composer_airflow_worker_hpa.yaml at the same directory

Summary

Google Cloud Composer is a great product to quickly setup airflow deployment. However, it does not currently support autoscaling. In this article, we introduce a way to enable autoscaling in Composer’s underlying Kubernetes and some calculations to estimate the resource needed. Hopefully this can help you speed up your task and save up some costs on the idle nodes. Let us know your thoughts in the comment section. Cheers!

If you like deep technical and optimization stuff, then we have a match! We are hiring for several exciting positions at Traveloka. Check out our openings here https://www.traveloka.com/en/careers

EDIT: added a simple script that tied it all together

--

--

Joshua Hendinata
Traveloka Engineering Blog

I enjoy writing about technology and coding in data engineering space.