Apache Airflow: Manage thousands of DAGs

Bentsi Leviav
5 min readMar 18, 2020

--

Apache Airflow is an incubating open-source platform developed by Airbnb Engineering & Data Science department. The platform lets you create, schedule and monitor data pipelines.

For those of you who are not familiar with Airflow — there is a good article (written by Airbnb) that covers up the design and the important concepts of the platform.

When you have several or dozens of DAGs, the provided Airflow’s management tools (Airflow’s UI, CLI, experimental API) seem to be adequate. But when you start to have hundreds and even thousands of them, these tools are insufficient. So, what can we do?

In this article, I will take you through the Airflow’s management solutions we’ve adopted in ControlUp (a fast-growing startup company), in an attempt to make your life much easier.

Airflow’s Backend

On the backend, Airflow uses SQLAlchemy — a smart open-source python toolkit that lets you work with a variety of RDBSs.

One of the most popular benefits of SQLAlchemy is the use of the ORM concept.

ORM-

ORM (object-relational mapping) is a programming technique for syncing and collaborating between RDBS tables and complex objects inside your OOP code.

SQLAlchemy, with the ORM concept, creates a virtual object database that becomes queryable from your code.

For instance, in the example below, we’ve created a class called “User” which is mapped to the “users” table.

ORM class example

Now, we can create a User object and insert it easily to the DB:

Airflow models” is the responsible module for mapping the code with SQLAlchemy to Airflow’s database. In this article, most of the solutions are using this module.

Airflow Connections managing

When working with hooks and operators or working in a remote logging mode, Airflow needs to know your external system’s connections, including information such as hostname, port, login username, password and more. These connections are stored in Airflow’s metadata database.

Currently, Airflow offers you to manage these connections in 2 ways:

  1. Airflow UI- like Airbnb site, Airflow’s UI is an informative and well-designed site for managing and monitoring your DAGs, connections and configurations (basically, the whole system components) with many useful features.
Airflow UI — Connections managing tab

2. Environment Variables- Airflow will take into account any environment variable that starts with the AIRFLOW_ prefix. So, to add a new connection, you need to create an environment variable with the AIRFLOE_CONN_ prefix. The value needs to be in a URI format, for example: Postgres://user:password@localhost:5432/master.

What if you have a huge number of connections (Dozens to hundreds that are hard to handle)? What if multiple services in your project use the same connection configuration and you really don’t want them to be duplicated and sync between them all the time?

You can save the data on a central configuration server and with a few python lines, you can easily collaborate and sync between them all.

An additional way to manage the connections is through the Airflow’s backend DB.

As mentioned earlier, the Airflow backend DB is mapped to python objects (the ORM concept), so all we need to do is to manage the connection through the Connection object that looks as follow:

Airflow Connection ORM declaration

Below is an example for adding a connection:

Add connection example

In the two first lines, we imported the Airflow Settings and Connections modules.

The Setting module is responsible, among other things, for the SQLAlchemy settings (pool connections, pool size, etc).

On the fourth line, from the settings module, we initialized our session object (using the one that already runs on the current Airflow server). Now in that variable, we have an SQLAlchemy’s session object looks as follow (Python CLI example ran on Airflow server):

session object example

In lines 6 and 13, we’ve created the actual connection objects (MSSQL and Spark connections) with the class declaration which we have imported from the models module.

Finally, we added the two objects to the session and committed the changes.

These connections will be immediately added to your Airflow’s DB and you can watch them on the Airflow UI.

Pause/Unpause multiple DAGs

Controlling your DAGs is one of the most important things in a scheduler platform. You can pause and unpause DAGs through the Airflow UI.

In ControlUp, which has thousands of DAGs, we’ve reached the conclusion that we must have the ability to pause and unpause multiple DAGs. According to the Connections example, we’ve implemented it with the ORM concept as follows:

Pause/Unpause multiple DAGs

In this example, we used the DagModel object- an object that provides basic data for a related DAG.

In lines 1–9, we defined a function that receives as parameters the DAGs list and the desirable pause mode.

In the second line, we initialize the SQLAlchemy session object (as we did in the previous example).

After initialization, the function iterates over the DAGs ids, and for each DAG id, we receive its ORM object from the DB (line 4).

In line 8, we change the is_paused attribute to the desired pause mode (True for pausing and False for un-pausing).

(A little note - we could query the DB using all the DAGs ids and not one by one. Then, iterate them and update the pausing mode, which will be more efficient).

Notice that you can initialize the SQLAlchemy session object only on the Airflow server/workers (or connect it remotely and import the airflow module).

All of these solutions are meant to make your Airflow operations much easier.

Eventually, these are code snippets/scripts. What is the best way to make them accessible?

Well, that depends. Airflow is an open-source project, so you can fork the Airflow repository and extend the Airflow CLI /experimental API or even create your own.

Airflow Plugins

Another way, which I have found to be very easy, is to add and extend the Airflow’s plugins.

Airflow lets you the ability to add custom plugins. There is a variety of plugins: APIs, CLIs, and internal Airflow helpers.

So if you don't want to fork the Airflow project (and get messy with the Airflow versions and upgrades), you can extend and use an existing API plugin.

How do you do that?

You just need to drop your .py files on your $AIRFLOW_HOME/plugins folder.

You can see an example of this REST API plugin, and extend/add your own routes.

It's all about code

The power of Airflow, as an open-source project, is that everything is reachable using a few code lines. You can integrate with your internal organization services, build actions and develop incredible and creative solutions.

I recommend you to read the models module first and understand how it could serve you and make your life easier.

Feel free to share your opinion, correct me, or just share your experience in the comments below.

--

--

Bentsi Leviav

Big Data Engineer Skilled with Apache Spark, Airflow, Hadoop Ecosystem and more. Contact: bentzi.leviav@gmail.com | Linkedin: www.linkedin.com/in/bentsi-leviav