Understanding Open Source Airflow and MWAA Concurrency.

Sukul Mahadik
9 min readJul 15, 2023

In Open Source Airflow, there are multiple knobs available for controlling the concurrency of tasks and DAGs within your Airflow instance. It is crucial to comprehend these configurations in order to determine the maximum number of DAGs and tasks that can be executed simultaneously. Equally important is understanding the reasons behind task queuing and the factors contributing to it.

AWS Managed Workflows for Apache Airflow (MWAA) offers support for the same configurations as the open-source Airflow, albeit with different default values.

The table below provides a comprehensive overview of the available configurations, along with their descriptions and default values:

1] core.parallelism :

  • Airflow 2.0 introduces the ability to utilize multiple active schedulers, and with MWAA, we can have up to 5 active schedulers. The maximum number of task instances with the “running” state in the metadata database is generally determined by multiplying this value core.parallelism by the number of schedulers in your cluster. (core.parallelism * number of schedulers)
  • To summarize, this configuration governs the concurrency at the overall Airflow instance level.
    The core.parallelism configuration parameter in Airflow allows you to control the maximum level of concurrency or parallelism for task execution across all workers in your Airflow instance.
  • In the case of MWAA, the value is typically set to a very high number such as 10000. However, even with the largest possible MWAA instance, it is unlikely to reach such a high level of concurrency. Therefore, manual manipulation of this number is usually unnecessary when using MWAA.
  • On the other hand, in Open Source Airflow, this number is set to 32 by default. When working with the open-source version, it is common to increase this number based on the number of workers and the maximum number of tasks that can be executed on each worker. Careful consideration should be given to the capacity and resources of your Airflow environment when adjusting the core.parallelism value. Factors such as the number of workers, their capacity, and the available system resources should be taken into account to avoid resource contention and performance issues.

2] core.max_active_tasks_per_dag:

  • The core.max_active_tasks_per_dag configuration parameter in Airflow determines the maximum number of active (running or queued) tasks allowed per DAG at any given time. It helps control the concurrency or parallelism of tasks within a specific DAG. Very Important to note that this value is calculated across all runs of a given DAG.
  • This configuration can be used a tool to prevent resource overutilization or to manage the workload on your Airflow infrastructure.
  • If the number of active tasks exceeds the configured limit for a DAG, any additional tasks will be queued until the number of active tasks decreases. Once the number of active tasks falls below the maximum allowed limit, the queued tasks will be eligible for execution.
  • Adjusting the core.max_active_tasks_per_dag value should be done carefully, taking into consideration the resources available in your Airflow environment, such as the number of workers, their capacity, and the overall system resources. Setting a too high value may lead to resource contention and performance issues, while setting it too low may result in slower task execution or task starvation.
  • In the case of MWAA, the value is typically set to a very high number such as 10000. However, even with the largest MWAA instance, it is unlikely to reach such a high level of concurrency. In some cases, it may be necessary to reduce this value to ensure that certain runaway DAGs do not monopolize all available slots.Consider a scenario where a backfill DAG is being executed for several years. If left uncontrolled, these backfill DAG runs could occupy all the available slots, starving the Business-As-Usual (BAU) DAGs. To prevent this, it’s important to manage the concurrency effectively. Assuming the maximum number of parallel tasks that can run on your Airflow instance is set to 100, you can strategically adjust the max_active_tasks parameter. Setting it to 25 would ensure that the backfill DAG runs only utilize 25% of the available slots, leaving the remaining 75% for your BAU processes.
  • Note that this configuration was earlier called core.dag_concurrency. In recent versions of Airflow, this was changed to core.max_active_tasks_per_dag.
  • Note that we can override this configuration at an individual DAG level using the max_active_tasks parameter. If not provided, it defaults to core.max_active_tasks_per_dag. Following shows an example of how to configure at an individual DAG level:
dag = DAG(
dag_id=DAG_ID,
default_args=default_args,
schedule="@daily",
catchup=False,
doc_md=__doc__,
max_active_tasks=20
)

3] core.max_active_runs_per_dag:

  • The core.max_active_runs_per_dag configuration parameter in Apache Airflow controls the maximum number of concurrent active runs allowed for a single DAG within the Airflow instance.
  • By default, both in Open Source Airflow and MWAA, the value of core.max_active_runs_per_dag is set to 16. If the number of concurrent DAG runs exceeds 16, any additional runs beyond this limit will be queued and executed once the active runs decrease.
  • It’s important to note that the max_active_runs_per_dag setting applies at the Airflow instance level, meaning it affects all DAGs. However, you have the flexibility to override this setting at the DAG level using the max_active_runs parameter, allowing you to set a different limit for specific DAGs.

For example, you can define a DAG with a specific max_active_runs value:

dag = DAG(
dag_id=DAG_ID,
default_args=default_args,
schedule="@daily",
catchup=False,
doc_md=__doc__,
max_active_runs=31
)
  • This allows you to control the maximum number of concurrent runs for that specific DAG, overriding the instance-level setting. This can be particularly useful when running backfills or when you require more than 16 concurrent DAG runs for a specific workflow.

4] core.default_pool_task_slot_count:

  • The core.default_pool_task_slot_count configuration parameter in Apache Airflow specifies the default number of task slots available in the “default” pool for task scheduling. Note that any airflow instance automatically starts with a “default” pool. We can create additional pools as required (Explained later)
  • Task slots represent the capacity or concurrency limit within the default pool, which is used when tasks are not assigned to any specific named pool. Each task consumes one slot when running, and the number of task slots determines the maximum number of tasks that can be executed concurrently within the default pool.
  • By default, the value of core.default_pool_task_slot_count is set to 128 in Open source Airflow. Beyond that the tasks shall get queued. However note that in MWAA, the slot count for this pool is set to a very high value .i.e 10000 (which may practically be impossible to achieve)
  • Keep in mind that the default pool’s task slot count is a global setting and applies to all tasks that are not assigned to any named pool. If you need more granular control over task concurrency, you can create and configure multiple named pools with different slot counts to allocate resources according to specific task requirements.

5] celery.worker_concurrency:

  • The celery.worker_concurrency configuration parameter in Celery controls the level of concurrency or parallelism within a Celery worker process. It determines the number of concurrent tasks that a single worker can handle.
  • Setting a higher value for celery.worker_concurrency allows the worker to handle more tasks concurrently, potentially increasing the throughput of task execution. However, it’s important to consider the available system resources (CPU, memory) and the nature of the tasks being executed to avoid overloading the worker or causing performance issues.
  • On the other hand, setting a lower value for celery.worker_concurrency can limit the concurrent task execution within a worker, which may be useful when dealing with resource-intensive or long-running tasks that require more dedicated resources.
  • It’s worth noting that the optimal value for celery.worker_concurrency depends on factors such as the capabilities of your infrastructure, the nature of your tasks, and the expected workload.
  • Please note that in MWAA, even though it utilizes the Celery executor, the celery.worker_concurrencyconfiguration parameter is ignored. This is due to MWAA’s autoscaling feature, which dynamically adjusts the number of worker nodes based on the workload. When autoscaling is enabled in MWAA, the celery.worker_autoscale configuration takes precedence over celery.worker_concurrency. The celery.worker_autoscale parameter allows MWAA to automatically scale the number of worker nodes up or down based on the current task load.

6] celery.worker_autoscale:

  • The maximum and minimum concurrency that will be used when starting workers with the airflow celeryworker command.
  • Note that open source Airflow does not have any defaults. However MWAA has different values based on the environment class we choose. The environment class you choose for your Amazon MWAA environment determines the size of the AWS-managed AWS Fargate containers where the Celery Executor runs, and the AWS-managed Amazon Aurora PostgreSQL metadata database where the Apache Airflow schedulers creates task instances.
  • By default a mw1.small worker could run 5 concurrent tasks , mw1.medium could run 10 concurrent tasks and mw1.large could run 20 concurrent tasks.
  • You can use this option to free up resources by reducing the minimum, maximum task concurrency of Workers. The values specified in minimum, maximum must be the same. Workers accept up to the maximum concurrent tasks configured, regardless of whether there are sufficient resources to do so. If tasks are scheduled without sufficient resources, the tasks immediately fail. We recommend changing this value for resource-intensive tasks by reducing the values to be less than the defaults to allow more capacity per task.

7] max_active_tis_per_dag or task_concurrency:

  • We can also control the concurrency of a task across all the DAG runs of the DAG. Note that we do not have an option to specify defaults at an instance or the DAG level. This configuration is applied to individual task.
  • Example:
test_bash_task = BashOperator(
task_id='test_task',
bash_command="""
sleep 1200
""",
dag=dag,
max_active_tis_per_dag=23
)
  • Note that this configuration was earlier referred to as task_concurrency and is on path of deprecation. Thus max_active_tis_per_dag is the correct configuration to use.

Ref: https://airflow.apache.org/docs/apache-airflow/2.5.1/_modules/airflow/models/baseoperator.html#BaseOperator

if task_concurrency and not max_active_tis_per_dag:
# TODO: Remove in Airflow 3.0
warnings.warn(
"The 'task_concurrency' parameter is deprecated. Please use 'max_active_tis_per_dag'.",
RemovedInAirflow3Warning,
stacklevel=2,
)
max_active_tis_per_dag = task_concurrency

8] Use pools to control concurrency of arbitrary set of tasks:

  • In a scenario where you have 20 tasks in your Airflow instance that extract data from an upstream Redshift database, it’s important to consider the concurrency limitations of the database’s WLM. For instance, if the Redshift WLM only allows 3 concurrent queries, running all 20 tasks simultaneously would result in only 3 tasks succeeding while the rest would fail due to WLM rejecting the excess queries. To address this situation and control the concurrency of specific groups of tasks, Airflow provides a feature called “pools.”
  • By utilizing pools in Airflow, you can manage the concurrency of arbitrary groups of tasks. Every Airflow instance comes with a default pool, which has a predefined number of slots that determine its concurrency. In open-source Airflow, the default pool has a slot count of 128, while in MWAA, it is set to 10,000. It can be modified through the UI or CLI (but cannot be removed).
  • The list of pools is managed in the UI (Menu -> Admin -> Pools) by giving the pools a name and assigning it a number of worker slots. Following is a screenshot from a MWAA setup.

Next I create a new pool named “upstreamRedshift’” and set the number of worker slots to 3.

  • Tasks can then be associated with one of the existing pools by using the pool parameter when creating tasks. Following shows an example of how we can use pool parameter to assign a pool:
test_bash_task = BashOperator(
task_id='test_task',
bash_command="""
sleep 1200
""",
dag=dag,
pool="upstreamRedshift"
)

Tasks will be scheduled as usual while the slots fill up. The number of slots occupied by a task can be configured by pool_slots. Once capacity is reached, runnable tasks get queued and their state will show as such in the UI. As slots free up, queued tasks start running based on the Priority Weights of the task and its descendants.

  • Airflow tasks will each occupy a single pool slot by default, but they can be configured to occupy more with the pool_slots argument if required. This is particularly useful when several tasks that belong to the same pool don’t carry the same “computational weight”. Say we have a resource intensive task. We can assign that heavy task 2 pool slots, while the lighter ones take 1 pool slot. This shall make sure that when that one heavy query runs, only 2 queries run a time.
test_bash_task = BashOperator(
task_id='test_task',
bash_command="""
sleep 1200
""",
dag=dag,
pool="upstreamRedshift",
pool_slots=2
)

--

--