Cross-DAG dependencies in Apache Airflow

Lucas Fonseca
Blog Técnico QuintoAndar
8 min readApr 20, 2021

Cross-DAG dependency may reduce cohesion in data pipelines and, without having an explicit solution in Airflow or in a third-party plugin, those pipelines tend to become complex to handle. That is the reason we, at QuintoAndar, have created an intermediate component to handle relationships across data pipelines called Mediator, in order for them to be scalable and maintainable by any team.

At QuintoAndar we seek automation and modularization in our data pipelines and believe that breaking them into many responsibility modules (DAGs) enhances maintainability, reusability and understanding to move data from one point to another. However, extending interconnections between DAGs tend to reduce those enhancements, make them complex and, above all, there’s no explicit built-in solution in Airflow for them. So, for overcoming this obstacle, companies adopt different approaches which some we have already used before. However, we tried to find a more suitable one to solve this cross-DAG dependency matter.

Context: a quick overview across the QuintoAndar data pipeline

Our pipeline comprises hundreds of specialized DAGs according to each data stage layer. Some extract data from sources and load it into our data lake, others are responsible for transforming that already loaded data (that means changing the file format, applying naming conventions, or adding general metrics to enrich it). And there are the business modeling ones, that take the transformed data and shape it in a way insights can be generated to support decision making. With this DAGs architecture we keep the specificity of configurations for the DAG and also the business domain restricted to each data stage layer.

Workflow with dedicated DAGs

Problem: handling dependencies between the DAGs

Whereas Airflow does not have a native feature for controlling the DAGs’ cross-dependencies, having various dedicated DAGs is a problem that data engineering teams need to struggle with. Because it can be hard to keep the workflow integrity in the pipeline, i.e., assure that one DAG will run only and just after its dependencies.

Some possible solutions

Among other trivial approaches for solving this challenge we could think, for example, of manually managing the DAGs schedulings, or yet having an Über-DAG (a unique DAG with the entire workflow, in which the dependencies are managed between the internal tasks). These are not suitable approaches if we reckon software engineering matters of a system as scalability, monitoring, usability, etc.

Reading the Airflow’s documentation regarding cross-DAG dependencies and scratching deeper through some articles and use cases we find approaches like using sensor operator, triggerDagRun operator, and either xcom to manage the DAGs’ dependencies. Again, in our point of view, such approaches are not the most suitable for solving this issue, because they are yet pretty reactive solutions, which leads to an unproductive workflow.

Even if we use a custom trigger operator to trigger the dependent DAG after every dependency is executed, this does not stand as an optimal solution when we think of a scenario where, for example, a DAG depends on other 5 DAGs. Because in this case, which of the dependencies would be responsible for triggering the dependent DAG?

In summary, with the aforementioned approaches, the DAGs have direct communication among them, and we aimed to avoid this chaotic scenario. So, we looked for a more reasonable solution that handled these scenarios wisely, in a more robust, scalable, fault-tolerant, and automatic way, without the data engineers’ daily interference.

DAGs Mediator

The Mediator in Airflow has the responsibility of looking for successfully finished DAG executions that may represent the previous step of another. That is, if a DAG is dependent on another, the Mediator will take care of checking and triggering the necessary objects for the data flow to continue.

Simulation of the Mediator operating in case of success and failure of a dependency

The concept behind

There is a known software engineering design pattern that suggests us to use a third-party component to manage the object’s dependencies. This design pattern, called Mediator, reduces the complexity of dependencies between objects by handling them.

We can refer to this good example of the image below, where instead of each driver telling the other when to continue or stop, it has a traffic light that continuously stops or lets the drivers continue their way.

Traffic light acting as the drivers’ mediator. Source: Refactoring Guru.

That said, bringing this back to our case, instead of having a DAG directly communicating with another (as afore suggested in some approaches) we can have a mediator component that is responsible for handling the DAGs’ dependencies and starting them at the appropriate times.

In short, our solution consists of an independent process that handles the DAGs’ dependencies and triggers the DAGs according to their respective dependencies. We may think about it as a Scheduler process alternative, that does not rely on the DAG schedule for triggering it.

Implementation

The “Mediator”, as we named this service, was implemented using an Airflow DAG, in this first version. Although it is a DAG that controls others, we shouldn’t think about it properly as a DAG, but as a process that is taking advantage of Airflow’s features.

The Mediator DAG is composed of two sets of tasks: the dependencies checker and the dependent triggers. The dependencies checker will be a unique task that identifies the dependencies of each DAG and evaluates its statuses, whereas the dependent DAGs have a dedicated triggering task, which is pretty straightforward being responsible for triggering the respective dependent DAG.

Visualization of the Mediator DAG graph

Since the Airflow engine and architecture have the DAG as the unit to be triggered, the dependent will always and only be a DAG. Whilst the dependency can be set either on an entire DAG or on a single task, i.e., each dependent DAG handled by the Mediator will have a set of dependencies (composed by a bundle of other DAGs or specific tasks).

Example of DAGs with dependency on tasks

Every time the Mediator DAG runs, the dependencies-checker analyses each dependency status, checking if it has finished successfully or failed. Understandably, if some dependency of the DAG dependency set has failed, then the Mediator will not trigger the DAG, and the respective trigger task will be set as failed (in red color). On other hand, if the dependent DAG does not need to be triggered, in this case, the trigger will be skipped (pink).

Different statuses of the triggers

If you got interested in how the dependencies checker works, this flow precisely illustrates the constraints:

Some digging into the code of the Mediator DAG

We created an external plugin with two custom operators, that are responsible for the mediation engine:

  • the QuintoAndarShortCircuitExternalOperator: responsible for the dependencies evaluation logic. It receives a dependencies dictionary to recognize the DAGs and their dependencies. The dependencies dictionary is stored in a mapping file:
Dependencies dictionary
  • and the QuintoAndarCustomTriggerDagOperator: that triggers each DAG of the dependencies dictionary. Before triggering the DAG, this custom operator identifies the execution date that the DAG shall have based on metadata shared by the previous task via an xcom.

In short, we send a dependencies dictionary to the Mediator, which in turn, runs in short intervals of time triggering the next DAG from the workflow after the previous has successfully finished.

A vision of the workflow with the DAGs’ Mediator

The first layer DAGs are started by Airflow and Mediator triggers the downstream at each run

Next steps

We understand that over time the Mediator could become a God object that does too much by controlling everything and we are aware of such problems. Hence, there are some improvements we consider for the next steps, aiming that our solution can evolve to a greater one:

  • Backfill support: currently, Mediator only manages the last execution layers of the DAGs based on a daily window lookup.
  • Attend to broader scenarios: Mediator only handles daily DAGs. DAGs that run multiple times a day or less than daily are not supported in the Mediator engine yet.
  • Build natively in Airflow Scheduler: maybe with a more mature use of the mediation concept for DAGs and with evolutions in the attending scenarios, we could think of transferring that mediation intelligence to the scheduler core.
  • Stop using an Airflow DAG: as I said, we used the Airflow DAG concept for validating this first version of the Mediator. Although, we aim to use a different approach for the Mediator's future versions.
  • Open it to the community =)

Achievements

Parallelization: our pipeline has become more parallelized once DAGs are not executed in a fixed time, but on demand.

Scalability: it has become more scalable in a way that more DAGs can be added without much effort in this sense.

Maintainability: it has become easier to maintain because everything has its proper context, requirements and constraints. With low step coupling, the code gets less complex and as a result, we are able to avoid human errors.

Velocity: in terms of performance of the pipeline processing, now each DAG is started sooner as possible and the pipeline is processed in the shortest time, delivering the data sooner.

Fresh data: when a DAG is re-executed, its downstream workflow is automatically triggered by the Mediator, daily guaranteeing the data continuously refreshing without any manual interference.

Conclusion

To create high-quality systems that together compose a robust data platform it is important that data engineers apply software engineering concepts in their daily work. In this case, we observe it is sometimes not practical to combine multiple DAGs into one. Hence, our solution was defining a Mediator to handle dependencies and bring cohesion to the data pipeline without losing its purpose.

References and more about the items covered here:

If you’d like to be a part of a high-performance data engineering team that shapes awesome processes towards building a scalable data platform, like the ones described here, so come and join us!

--

--

Lucas Fonseca
Blog Técnico QuintoAndar

Data Engineer Manager@Inventa. Data and Gin lover, always curious, amateur musician, and Gym rat.