Using Airflow variables to create a dynamic DAG

Jun Wei Ng
4 min readMar 14, 2022

--

Sometimes, the workflow, or data pipeline, that we are trying to model in an Airflow DAG is not static — it changes under varying conditions. For example, an Extract-Transform-Load (ETL) pipeline that extracts from a varying number of input sources. In these situations, it would be implausible to recreate the DAG each time the condition changes — that would be highly manual and taxing for the team maintaining the Airflow DAGs. A better way to do this would be to build dynamism into the DAG. What if you could make the DAG change depending on a variable?

In this article, we will explore using Airflow variables to make the DAG change its workflow based on such a variable. Note that the following discussion is based on Airflow version 2.

Before we begin, using Airflow variables is not the only way to achieve a dynamic workflow, and it comes with its own set of pros and cons, which we shall dive deeper as we go along. Here are other methods, each with their own sets of pros and cons, that you can consider in place of using an external database:

Here’s an article summarising the comparison of this method against the above 5.

The how

Using Airflow variables is probably one of the easiest method to achieve a dynamic Airflow DAG. The Airflow UI already provides us with a way to create and update Airflow Variables.

Alternatively, we can also use airflow variables set or airflow variables import to create new variables, or import a set of variables into the Airflow metadata database.

Consider the following example workflow. The source files might all be dropped in a central location, and the DAG is responsible for re-locating them before perform the Extract-Transform-Load (ETL) pipeline for each source.

As the sources are only determined at runtime, the DAG will need to dynamically create the ETL task groups for each source present during runtime. There could even be no source files available on some days.

We can use Airflow variables to create the dynamic workflow as such:

In line 23, we use the Variable.get function to retrieve the dynamic configuration stored in the Airflow variable database. Using the list of sources stored in the variable, we can create an ETL workflow for each source.

For example, when we set the Airflow variable as ["foo", "bar"], as seen in the screenshot above, we get the following DAG:

The value set in the Airflow variable could be done in prior tasks, in our case it could be done by the split_files_by_source task, which would have an understanding of how many sources are present for the particular DAG run.

Benefits

The biggest benefit is that the ease of dynamic configuration. One just needs to update the Airflow variable and the DAG gets updated with the new workflow (that is after the Airflow scheduler has parsed and serialised the changes).

One can also easily view the effect of the change in dynamic configuration on the DAG via the Airflow UI. As both the dynamic configuration and the DAG can be viewed on the Airflow UI, debugging the dynamic workflow becomes simple.

Drawbacks

The biggest drawback from this method is that there will be additional load on your Airflow metadata database. Under the hood, the Variable.get function retrieves the value from the Airflow metadata database, which means it will need to open a database connection in order to perform the query. Each time the Airflow scheduler parses the DAG file for updates, the create_dag function is called, which in turn executes the Variable.get function to determine the dynamic workflow. The frequency of parsing depends on the configuration of the Airflow scheduler, namely the min_file_process_interval. By default, it is set to 30 seconds, which means that a database connection is opened and a database query is executed every 30 seconds. Depending on the setup of the metadata database, there could be additional costs based on the amount of requests done too.

Secondly, dynamic changes might not be reflected instantaneously. These changes are only processed by the Airflow when the scheduler has parsed and serialised the DAG. In Airflow v2, the scheduler will need to serialise the DAG and save that into the metadata database. The webserver then retrieves the serialised DAGs from the database and de-serialise them. These de-serialised DAGs then show up on the UI, along with any updates to their workflow or schedule. As mentioned before, the frequency of update depends on the configuration of themin_file_process_interval setting of the scheduler.

Lastly, using Variable.get in the top-level code in the DAG file is not considered best practices by Airflow. The main reason is because it will affect the performance and scalability of Airflow. As the Variable.get function involves external database connection and query, the parsing and loading of the DAG file is impacted. This in turn affects how fast DAGs get updated in the Airflow UI.

Conclusion

Although using Airflow variables to dynamically configure a DAG is simple, it has a few drawbacks that need to be weighed out carefully.

The code snippet used is also available in this github repository.

Happy coding! (:

--

--

Jun Wei Ng

Software developer @ Thoughtworks. Opinions are my own