Implement DAG Dependencies Using TriggerDagRun Operator

Aashish
GumGum Tech Blog
Published in
5 min readJun 30, 2022

Workflow management systems are mainly used to represent workflows as a collection of tasks and dependencies between them. Based on the use case and the Quality of Service requirements, a lot of variations in workflow management systems can be found. The most common representation is in the form of Directed Acyclic Graphs (DAG), in which a task is represented by nodes, and dependencies between the tasks are represented by directed edges connecting the nodes. Fig.1 shows an example of task workflow in the form of DAG.

Fig. 1: Example of a DAG

Apache Airflow is an open-source workflow management platform that facilitates the composition and scheduling of all workflows. Mainly popular for data engineering pipelines, it has a built-in user interface to monitor the progress of workflow execution.

Dag scheduling

The scheduling of workflows in Airflow is based on a time interval. The time intervals can be specified as timedelta objects (e.g., timedelta(days=2) if the job is to be scheduled on a specific cadence (every 15 minutes, etc), convenience strings (e.g., @daily for daily execution at midnight), or cron expressions (e.g., 20 15 * * * that allows to run the job at 3:20 pm every day). Thus, the workflows can be triggered at a certain time or interval.

Other ways to trigger workflows

In contrast to the time-based execution of the work-flows, many times there is a need to trigger actions based on some external events; e.g. arrival of input files from multiple sources on a shared drive could be a reason to start execution of a workflow. Since the input data can be delivered asynchronously, a safe play would be to initiate the workflow only when it is certain that all input sources have delivered their data. However, this kind of approach could result in high waiting times ranging from hours to days.

One way to solve this in Airflow is with the help of sensors. A sensor is a subclass of operators which continuously polls for some certain condition to happen. If the specified condition is not yet fulfilled, the sensor will either wait for the condition to be true (e.g., once the data is available in the given path) or a timeout is reached eventually. As a result, the data get processed immediately after delivery, without causing unnecessary waiting. The default timeout for a sensor is set to seven days. However, if our DAG has dependency on some other DAG, then sensors might not be the optimal solution.

But what if the previous DAG is not completed and the next DAG executes? If any DAG fails, will all corresponding DAGs in the pipeline fail, and do we have to manually re-run all of them?

It might happen that a DAG set to execute once a day fails due to some reason that is not resolved on the same day, and new runs of the same DAG are initiated every day along with the sensors. The sensors keep on polling for the given condition to become true, and eventually, more and more tasks start running. It is important to note however that, there is a limit on the maximum count of tasks running in Airflow, as well as the number of tasks executed per DAG, the count of tasks on a global level and the number of DAG runs allowed per DAG. Once the global limit is reached, the entire system may get stalled. This kind of behavior is also referred to as sensor deadlock.

Fig. 2: DAG Dependency

In the above image, we can see that DAG d required both DAG b & DAG c to be completed. We can implement sensors on task level for these DAGs, but for entire DAG, there is no way to check if it was completed or not. In the above case, if any one of DAG b or DAG c fails, then DAG d will fail.

Trigger DAG Run Operator

Theoretically, DAG execution does not follow primary and secondary architecture. This kind of approach can be implemented using TriggerDagRunOperator. The TriggerDagRunOperator is an effective way to solve the problem mentioned above and to implement cross-DAG dependencies. The operator allows us to trigger other DAGs in the same Airflow environment. Thus, it also facilitates decoupling parts of a workflow, and is an alternative to SubDAGs. The TriggerDagRunOperator is used in situations when one upstream DAG needs to trigger one or more downstream DAGs, or a dependent DAG is in between the workflow of the tasks in the upstream DAG. The argument to the operator is the trigger_dag_id that must match with the dag_id of the DAG to trigger.

Fig. 3: Triggering a DAG using TriggerDagRunOperator

In the above image, Task A2 is using TriggerDagRunOperator to call DAG B. This way we can create a DAG solely for calling other DAGs based on the order of our pipeline. The next task will wait for the previous task to be completed (If wait_for_completion is set to true). This will solve our DAG dependency problem.

Following are the mandatory parameters for the TriggerDagRunOperator:

trigger_dag_id — The dag_id to trigger.

execution_date — Execution date for the DAG .

reset_dag_run — Whether to clear or not the existing DAG run if it already exists. This is useful while backfilling or rerunning an existing DAG run.

wait_for_completion — Whether or not to wait for DAG run completion.

poke_interval — Poke interval to check DAG run status when wait_for_completion=True

Some important points to note while using this operator:

  • The config input or a specific execution date for a dependent DAG can be specified using the conf and execution_date parameters respectively.
  • Consider a scenario in which a DAG A performs some tasks and then Triggers another DAG B. DAG A is successful only if DAG B is successful i.e. DAG A’s completion is dependent on completion of DAG B. In such situations, wait_for_completion parameter should be set to True. This parameter ensures that the downstream DAG has successfully completed before continuing with the upstream DAG.
  • Triggering DAG B from another DAG A can be achieved as long as the task ID of DAG B is known to DAG A.
  • In the situation when the result of the processing by a DAG is to be passed to another DAG through a variable, it can be done through subclassing this Operator and extending it by injecting the code of the trigger function inside the execute method before the call to the trigger_dag function call.

Conclusion

Summing up, TriggerDagRunOperator can be used to run some complex or computation intensive DAGs that need to be executed only when certain conditions are true. It works in a fire-and-forget way. This means that the parent DAG doesn’t wait until the triggered DAGs are complete before starting the next task. Both controlling and triggered DAGs must be in the same Airflow environment. Complex DAG dependencies can be implemented using the wait_for_completion parameter.

Credits

Special thanks to @bhagyashri.kelkar for writing this blog with me.

We’re always looking for new talent! View jobs.

Follow us: Facebook | Twitter | LinkedIn | Instagram

--

--