Improving Performance of Apache Airflow Scheduler

Tomasz Urbaszek
Databand, an IBM Company
5 min readMar 31, 2020

Apache Airflow is an open-source tool for creating and managing complex workflows. More recently, Airflow has been gaining a lot of traction and popularity among data scientists for managing machine learning workflows. With increasing usage, we can see user expectations increase too. Like all users, we also expect tools to be reliable, scalable and work out of the box. The Apache Airflow community is working on improving all of these aspects. The recent joint effort of Databand and Polidea has brought many performance improvements to the core of Airflow.

Why? Tackling Airflow Inefficiencies

Airflow is a big project with many features and moving parts. Airflow performance issues will often start to appear when you need to run hundreds of workflows with dozens of tasks. This is a major impediment to scaling Airflow for big use cases. The performance of Airflow scheduler has been a challenge for users with a lot of room for improvement. Why? Probably the main reason is the complexity and coupling of Airflow core internals. The codebase of the scheduler with the DAG parsing subprocess requires weeks to understand. The reason behind this is the complexity of the domain model that include in-memory structures and database models that are tightly coupled and it’s easy to forget what is what.

How? Our Approach

First of all, our work on Airflow performance was not the first. One of the last ones was done by Ash Berlin-Tylor (Apache Airflow PMC). Ash focused on improving the performance of Python code execution. In our contributions, we focused on the time when the interpreter was idle. We were especially interested in the time when the scheduler is waiting for a response from Airflow metadatabase. To investigate this, we simply registered an event listener to SQLAlchemy engine that helped us to measure the number of queries performed by a function or a process, and more importantly, the time of those queries.

The numbers we got at first were unbelievable. For example, running DagFileProcessor process on 200 DAGs with 10 tasks each, we observed 1801 queries! This number was definitely disproportionate to the setup. So we looked at the logs and we’ve found that some queries were executed many times. The next natural step was to find the places where they are used and analyze if something can be improved. Well, that was not hard.

The problems are especially visible in pull request dealing with the DAG.is_paused attribute. This attribute specifies whether to create a new DAG Run for the given DAG. The code for this attribute looks like this. At first glance it doesn’t look like there is a problem.

@provide_session
def _get_is_paused(self, session=None):
qry = session.query(DagModel).filter(
DagModel.dag_id == self.dag_id
)
return qry.value(DagModel.is_paused)
@property
def is_paused(self) -> bool:
"""
Returns a boolean indicating whether this DAG is paused
"""
return self._get_is_paused()

However, it has become a serious performance issue. When this attribute was used in the code, it could not be seen that its reading causes a query to the database. Developers are accustomed that reading attributes are light operations and do not cause any problem.This is not true in this case.

The N+1 problem is a situation when data is processed in a loop and another database query is performed for each iteration of the loop. This situation can look like this.

paused_dag_ids = {
dag.dag_id for dag in dagbag.dags.values() if dag.is_paused
}

The code snippet for the DagBag, which contained 200 DAGs, resulted in 200 queries and could be replaced by 1 query to the database. It is worth remembering that this problem may also occur when updating objects. When we use ORM, it is still sometimes necessary to consider whether the update operation will be effective, or should we rewrite the code and stop iterating in the loop and execute one hand-made UPDATE query.

Airflow is an application that uses many processes to ensure high performance. Therefore, tracing whether a given query is required is not easy, because the value saved to the object and passed to another object requires a deeper understanding of most of the code. In Airflow, there are not only short-lived objects that die after handling requests from the user like in classic web applications but also objects that are stateful for a long time.

Avoid regression

The last takeout from our Airflow performance story is how to avoid regression. Because once we improved the performance, we want to avoid any unnecessary or unfounded changes that have a negative impact. To achieve that we’ve created some additional tests that perform an operation in a context manager (the code is available on Github) that counts the queries, for example:

with assert_queries_count(3):
DAG.bulk_sync_to_db(dags)

Thanks to those tests, we will be able to catch up changes that have a potential impact on Airflow performance. Such tests are critical in open-source projects like Apache Airflow because contributors come and go, and the knowledge is not always passed on.

The result

We did 7 PRs related to solving these problems. We have also prepared other changes that prevent performance regression or allow us to easily repeat our research on another piece of Airflow.

Finally for a DAG file that has the following characteristics:

  • 200 DAG object in one file
  • All DAGs have 10 tasks
  • Schedule interval is set to ‘None’ for all DAGs
  • Tasks have no dependencies

When we test DagFileProcessor.process_file method, we obtain the following results:

Before (commit):

  • Count queries: 1801
  • DAG processing time: 8 275 ms

After (commit):

  • Count queries: 5
  • DAG processing time: 814 ms

Difference:

  • Count queries: -1 796 (-99.7%)
  • Processing time: -7 461 ms (-90%)

What does it mean? There’s way less queries! This results in a speed increase of the whole scheduler and impacts the overall performance of Airflow! In our conditions there was a 10x improvement in DAG parsing time. This change put a lot less load on Airflow database. Of course this doesn’t translate to a 10x speed up in running tasks. The changes are only available in master branch and will be part of Airflow 2.0.

The problem of N+1 queries is rather easy to spot and usually to fix. Some tools can help you with spotting it, but registering an event listener on your database engine is the simplest one. However, instead of fixing this problem, we should try to avoid it. All ORMs have batch operations that are easy to use and we should use them whenever possible. Airflow case shows that numerous small inefficiencies sum up to big bottlenecks. Which, once removed, will increase the performance of your tool.

Authors

Kamil Breguła — Software Engineer at Polidea | Apache Airflow Committer
Tomek Urbaszek — Software Engineer at Polidea | Apache Airflow Committer
Evgeny Shulman — CTO at Databand

--

--

Tomasz Urbaszek
Databand, an IBM Company

Opportunity seeker, software engineer, open source enthusiast.