High Performance Airflow Dags
The below write up describes how we can optimize the Airflow cluster for according to our use cases. These is based on my personal experience working with Airflow.I got a chance to brainstorm the issues with Google Team and below is the outcome of joint exercise between Google Composer Engineering Team and me.
Airflow is an open source distributed workflow engine for monitoring and scheduling workflows. Workflows are defined in form of Directed Acyclic Graph . Dags are created in form of Python scripts.
Airflow- either available as PaaS offering (Google Composer) or custom open source deployment on a VM — typically incur a lot of performance issues if not configured properly.
Few issues which I witnessed in past :
- Airflow UI crashing frequently.
- DAGS taking long time to get triggered.
- Unexpected wait time between two steps inside a dag.
Default configurations in Airflow are based on certain real world and generic assumptions. These assumptions don’t hold when we talk about using airflow at production scale. There is a need to review the airflow configurations and tune it according to our use cases. For sake of examples and screen shot, I will be using Google Composer , however the conditions are true for other modes of deployment as well.
The article is broken down into 3 subsections:
- Airflow overview
- Scalability Considerations
- Steps to scale up
AIRFLOW OVERVIEW :
Airflow Scheduler : Airflow scheduler checks all dags and tasks at a predefined interval to see if all the pre requisites are met by system to trigger next steps. It monitors and stays in sync with a folder for all DAG objects it may contain, and periodically (every minute or so) inspects active tasks to see whether they can be triggered.
Scheduler takes actions by processing a DAG file:
a. Starts/ends a DAG run
b. Tiggers next task
How fast a scheduler take actions depends on:
a. Total time to process all DAG files- #files * time_each_file and Multiple DAGs (~100) in one file is more efficient
b. Other files in DAG folder- Airflow processes everything, e.g., .jar, .sql
c. Use max_threads processes
Task Queue : All the tasks which were open for execution as per Airflow scheduler is submitted to a task queue. In case of google composer, tasks queue is built on top of redis.
Executors : There are multiple strategy to perform this. Most commonly used strategy is celery executor.
CeleryExecutor is one of the ways you can scale out the number of workers. With K8s running in the background, this makes more sense and natural fit to data intensive applications.
worker_concurrency celery processors on a worker
● For short tasks:
○ The DAG file is read before running task
○ A large DAG file slows down the task
● For long tasks (e.g., blocking, sleeping tasks):
○ A task takes away a celery process
○ Others can not run if no available celery processes
gcsFuse : Is Google Composer specific mounting of Google cloud storage to appear as local hard drives.
Airflow Database : In most cases its a My SQL database or My SQL as PaaS. The responsibility of his component is to save monitoring,history and audit details of executions of job.
AIRFLOW SCALABILITY CONSIDERATIONS
Scalability : Airflow Configuration
Airflow configurations can be changed on-the-fly. However, It takes several minutes to apply new changes
A full list of all Airflow configurations:Airflow Config
Key Airflow Configurations: Each Airflow deployment should go through the below properties and fine tune them as per use cases.
a. max-threads =8
- Its a scheduler configuration
- Number of processes to process DAG files
- estimate = num_cpu_per_node
b. worker_concurrency = 32
- Number of celery processes per Airflow worker
- estimate = num_dags * num_tasks_per_dag * execution_duration_per_task / dag_ scheduling_period / num_airflow_workers
- estimate = num_cpu_per_node * 6
- use lesser of the two estimates
c. parallelism = 96
- Its a core configuration
- The amount of parallelism as a setting to the executor. This defines the max number of task instances that should run simultaneously
- estimate = worker_concurrency * num_airflow_workers
d. dag_concurrency = 96
- The number of task instances allowed to run concurrently by the scheduler
- estimate = parallelism
e. non_pooled_task_slot_count = 96
- When not using pools, tasks are run in the “default pool”, whose size is guided by this config element
- estimate = parallelism
Scalability : RDBMS Database Configuration
● Scheduler, workers, and webserver communicate with Airflow Database to make progress
● Cloud SQL database hosted in Google managed project with a 2 vCPU, 7.5G memory instance by default
● For most cases, customers don’t have to worry about DB as it’s sufficient for O(1000) dags.
● Database may be a bottleneck for a high amount of concurrent DAGs, e.g., > 10K DAGs
Scalability : Webserver Configuration
- worker_refresh_interval = 3600
- async_dagbag_loader = True
- workers = 1 (default is 3)
If webserver is down after adding more DAGs, it is because loading all DAGs requires > 2G memory, contact the team to increase the memory of webserver instance.
STEPS OF SCALING
Steps for Scaling : Machine Type
a. Decide machine type: how many CPUs does the scheduler need?
○ Throughput: e.g., 1K DAGs completes in 1 hour
○ Delay: e.g., task triggering delay < 1 min
○ Select number of CPUs based on current Composer use cases
○ Assume linear performance gain: 2x CPUs = 2x throughput, ½ Delay
b. Number of nodes can be changed on-the-fly
c. Set proper Airflow configurations
Steps for Scaling : Example — 10 k DAGS
● Machine type: 16 vCPU
● Machine number: 8
● Number of DAGs: 10K DAGs with 1 bash task
● SQL instance: 2vCPUs (default, 90% SQL CPU usage)
● Completion time: ~ 45 min (~20min, if SQL CPU=4, 60% SQL CPU usage)
● Throughput: 7 tasks/s (10 tasks/s, if SQL CPU=4)
worker_concurrency = 96 # Celery process per worker
non_pooled_task_slot_count = 1000 # tasks sent for running at most
dag_concurrency = 1000 # workflows scheduled for running at most
parallelism = 1000 # total number of concurrent tasks on all workers
max_threads = 32 # number of processes to process DAG files
Steps of scaling up: Example — 100K DAGs
● Machine type: 64 vCPU
● Machine number: 8
● Number of DAGs: 100K DAGs with 1 bash task
● SQL instance: 8vCPUs (need to contact support to modify it)
● Completion time: ~ 2 hour
● Throughput: 15 tasks/s
worker_concurrency = 256 # Celery process per worker
non_pooled_task_slot_count = 2000 # tasks sent for running at most
dag_concurrency = 2000 # workflows scheduled for running at most
parallelism = 2000 # total number of concurrent tasks on all workers
max_threads = 64 # number of processes to process DAG files
Steps of scaling up: monitoring
Setup Stackdriver monitoring:
○ Delay/throughput of tasks/DAGs:
Use the regular expression =~ “your-environment-name.*” over workflow related metrics (“composer.googleapis.com/workflow/*”) to select all workflows related to this environment, then apply aggregators “SUM” over metric “/run_count”, and “MEAN” over metric “/run_duration”.
○ Monitor task_queue_length
Steps of scaling up: my DAG is slow?
● Check whether suggested Airflow configurations are applied
● Check resource usage:
○ If the scheduler uses 100% CPU, other nodes are under-utilized:
- Scheduler-bounded: upgrade machine type to have more CPUs per node
○ If all nodes are under-utilized:
- Are there many long running tasks? The number of Celery processes may be not enough, try to increase worker_concurrency
- Are there many DAGs? e.g., 10K- Database may be the bottleneck, contact the team to upgrade database
- Otherwise, if all nodes are fully utilized, check task_queue_length in Stackdriver monitoring:
If it increases to infinity, it is worker-bounded
Increase number of nodes and revise
I hope this will help people who want to optimize the Airflow deployments.
- Images used in the blog is been taken from multiple Google Platforms.