Airflow’s Magic Loop

A simple optimization that saves us a significant running time

Itay Bittan
Apache Airflow
Published in
4 min readJul 7, 2022

--

Here at Dynamic Yield, we use several various Airflow clusters for managing a lot of different pipelines. Each of those clusters runs tens of thousands of tasks on a daily basis. We run on top of Kubernetes and in a typical cluster we have an Airflow scheduler pod, an Airflow web-server pod, and a dedicated Kubernetes namespace to run all of our Kubernetes executor pods (which handle all of the tasks).

Airflow on Kubernetes (image by author)

In one of our Airflow clusters, we have an isolated pipeline, per customer (business), so each customer has its own DAG (encapsulated). Thus, we have a few thousands of DAGs dynamically created. Let’s assume that we have the following customers' data file:

1000,Customer1,e-commerce,2017-30-01
1001,Customer2,publisher,2018-12-02
1002,Customer3,gaming,2019-20-05
...
3999,CustomerX,e-commerce,2022-07-07

Our dag.py looks like:

customers = read_customers_data_file()
start_time = default_timer()
for customer_id, name, vertical, creation_date in customers:
dag_id = f"customer_{customer_id}"
dag = DAG(
dag_id,
tags=[name],
...
)
with dag:
task_1 = PythonOperator(...)
tesk_2 = SparkSubmitOperator(...)
task_3 = S3KeySensor(...)
# more tasks
...
task_1 >> [task_2, task_3]
globals()[dag_id] = dag
took = default_timer() - start_time
logger.info(f"took: {took}")

By measuring the time it takes to load all of those DAGs we found that it can take up to 120 seconds! to ensure that Airflow DAGs load won’t timeout, we set the following configuration:

AIRFLOW__CORE__DAGBAG_IMPORT_TIMEOUT=120.0  # default: 30.0

Updating the scheduler DAGs every few minutes is good enough for us, as each DAG runs once a day and it can take a few more minutes to add a new customer. On the other hand, after running with this setup for a while we noticed that for each Kubernetes executor (which starts for each task) we load all those DAGs over and over again. That’s because Airflow has a fast-follow (or “mini-scheduler”) workflow that can trigger the following task. In other words, each and every worker (we have thousands of them going up and down all day long) spends up to 120 seconds loading thousands of DAGs, while only one DAG is relevant. Practically if we have ~4,000 Daily DAGs with 10 tasks each we spend 4,000 X 10 X 120s =4,800,000 seconds (in the worst case), every day!

Magic Condition

Since each of our dynamic DAGs is totally independent of the others (each DAG represents a different isolated customer pipeline) our loop could load only the relevant DAG, assuming we are on a worker (Kubernetes executor) pod. We used the advantage of the command’s arguments of the pod to figure out whether our code run within the scheduler or within a worker.

Scheduler args : ["scheduler"]

Worker args : ["airflow", "tasks", "run", "customer_1000", "daily", ..]

As you can see above, a worker starts with the dag_id as the fourth argument, so here is our updated dag.py :

customers = read_customers_data_file()current_dag = None
if len(sys.argv) > 3:
current_dag = sys.argv[3]
start_time = default_timer()for customer_id, name, vertical, creation_date in customers:
dag_id = f"customer_{customer_id}"
if current_dag is not None and current_dag != dag_id:
continue
dag = DAG(
dag_id,
tags=[name],
...
)
with dag:
task_1 = PythonOperator(...)
task_2 = SparkSubmitOperator(...)
task_3 = S3KeySensor(...)
# more tasks
...
task_1 >> [task_2, task_3]
globals()[dag_id] = dag
took = default_timer() - start_time
logger.info(f"took: {took}")

and now it takes only ~200ms to load the DAG in each worker!

Photo by Veri Ivanova on Unsplash

Observations

  1. We use Airflow’s internal behavior (which passes the dag_id in args) for optimizing our loading time. It was tested with Airflow v2.3.2 (at latest as of Jul 7, 2022) but it might be broken in future versions. Pay attention if you upgrade to a newer version.
  2. dag_id can be also retrieved from the pod’s labels/annotations. Like args, this is something that might break as it’s not documented in Airflow’s official APIs, so be careful with that.
  3. We pulled the dag_id from args without any validation for the simplicity of this example. In reality, we added more validations to ensure that we lose the optimization in the worse case and nothing more. A quick improvement would be to test that the prefix of the 4th argument meets our expectations: current_dag.startswith("customer_") .

Summary

While our cluster takes Airflow to its limit with thousands of DAGs and tasks, we found that Airflow performs pretty well on that scale. Having said that, there is always a place for improvements as we demonstrate above. For the sake of generalization, Airflow loads all DAGs in the workers all the time. Since a case of isolated DAGs (with no dependency between them) is pretty common, this trick might help others as well, by speeding up their pipelines and making them more efficient.

--

--

Itay Bittan
Apache Airflow

Dad | Husband | Principal Software Engineer at Dynamic Yield | Tech Geek | https://www.linkedin.com/in/itaybittan