Manage a dynamic Airflow DAG using a MongoDB (or any other database)

Jun Wei Ng
5 min readMar 14, 2022

--

Sometimes, the workflow, or data pipeline, that we are trying to model in an Airflow DAG is not static — it changes under varying conditions. For example, an Extract-Transform-Load (ETL) pipeline that extracts from a varying number of input sources. In these situations, it would be implausible to recreate the DAG each time the condition changes — that would be highly manual and taxing for the team maintaining the Airflow DAGs. A better way to do this would be to build dynamism into the DAG. What if you could make the DAG change depending on a variable?

In this article, we will explore using an external database to make the DAG change its workflow based on such a variable. Note that the following discussion are based on Airflow version 2.

Before we begin, using an external database is not the only way to achieve a dynamic workflow, and it comes with its own set of pros and cons, which we shall dive deeper as we go along. Here are other methods, each with their own sets of pros and cons, that you can consider in place of using an external database:

Here’s an article summarising the comparison of this method against the above 5.

Photo by Maarten van den Heuvel on Unsplash

The how

An external database, such as, but not limited to, MongoDB, allows you to define a custom structure for your dynamic configuration.

Consider the following example workflow. The source files might all be dropped in a central location, and the DAG is responsible for re-locating them before perform the Extract-Transform-Load (ETL) pipeline for each source.

As the sources are only determined at runtime, the DAG will need to dynamically create the ETL task groups for each source present during runtime. There could even be no source files available on some days.

Our dynamic configuration can take the form of:

[
{
"name": "sources",
"sources": ["list", "of", "strings", "here"]
}
]

We can then use a MongoHook to retrieve the dynamic configuration from our external MongoDB database as such:

In the etl_using_external_mongo_db_dag.py DAG file, we used the get_sources function to retrieve the list of sources from a Mongo database. The get_sources function then uses a MongoHook to retrieve the dynamic configuration from the external database and returns the appropriate list of sources.

You might have noticed the use of an environment check before we create the DAG in the globals() in the etl_using_external_mongo_db_dag.py DAG file. This was done to make it easier to write unit tests for the DAG. Without the environment check, when we import the DAG file in the tests, it would execute the create_dag function, which in turn executes a connection to the Mongo database for the dynamic configuration. As this import takes place before any test setup is done, we will not be able to properly mock out the external dependencies. Thus, the only way out is to make sure the create_dag function is only called within the scope of a test, where we can control the mocking of the external dependencies.

For example, when we create the MongoDB document as:

[
{
"name": "sources",
"sources": ["foo", "bar"]
}
]

We get the following DAG:

The value set in the dynamic configuration could be done in prior tasks, in our case it could be done by the split_files_by_source task, which would have an understanding of how many sources are present for the particular DAG run.

Benefits

The biggest benefit is that the ease of dynamic configuration. Using the appropriate tools, one can update the dynamic configuration in the external database and the DAG gets updated with the new workflow (that is after the Airflow scheduler has parsed and serialised the changes). One can then easily view the effect of the change in dynamic configuration on the DAG via the Airflow UI.

Drawbacks

The biggest drawback from this method is that there will be additional load on your external database. Each time the Airflow scheduler parses the DAG file for updates, the create_dag function is called, which in turn executes the query to the external database to determine the dynamic workflow. The frequency of parsing depends on the configuration of the Airflow scheduler, namely the min_file_process_interval. By default, it is set to 30 seconds, which means that a database connection is opened and a database query is executed every 30 seconds. Depending on the setup of the external database, there could be additional costs based on the amount of requests done too.

Secondly, dynamic changes might not be reflected instantaneously. These changes are only processed by the Airflow when the scheduler has parsed and serialised the DAG. In Airflow v2, the scheduler will need to serialise the DAG and save that into the metadata database. The webserver then retrieves the serialised DAGs from the database and de-serialise them. These de-serialised DAGs then show up on the UI, along with any updates to their workflow or schedule. As mentioned before, the frequency of update depends on the configuration of themin_file_process_interval setting of the scheduler.

Lastly, performing an external database connection in the top-level code in the DAG file is not considered best practices by Airflow. The main reason is because it will affect the performance and scalability of Airflow as the parsing and loading of the DAG file is impacted. This in turn affects how fast DAGs get updated in the Airflow UI.

Conclusion

Although using an external database to dynamically configure a DAG is simple, it has a few drawbacks that need to be weighed out carefully.

The code snippet used is also available in this github repository.

Happy coding! (:

--

--

Jun Wei Ng

Software developer @ Thoughtworks. Opinions are my own