DAG Dependencies and the Apache Airflow Scheduler

Krzysztof Sniezynski
FLYR Engineering
Published in
4 min readMay 13, 2020

Apache Airflow is a great workflow manager, but its scheduler is unintuitive. In this article I will show you how Airflow works in two situations: when a single DAG is run and when one DAG depends on another.

When DAGs Are actually executed

The most important aspect of the Airflow scheduler to be aware of is that it executes a DAG when the DAG’s schedule period ends and not when it begins. In other words, if you have a DAG scheduled to run every two hours, the DAG instance scheduled at 10 AM will be executed two hours later at 12 PM. Let’s take a look at what the default Airflow variables are at the moment the DAG is executed, ie. execution_date, prev_execution_date, and next_execution_date.

If you look at the logs of the DAG, you will see that the execution_date is indeed two hours behind the log timestamp:

[2020-03-08 12:00:05,348] {{logging_mixin.py:112}} INFO - execution_date:      2020-03-08T10:00:00+00:00

On the other hand, if you run the DAG manually using the Airflow backfill command, the DAG is executed right away because it was you and not the scheduler who triggered it. For example, say you were to run the command:

airflow backfill dag_1 -s 2020-03-08T10:00:00 -e 2020-03-08T10:00:00

In that case, the execution_date would simply be 2020–03–08T10:00:00.

Two DAGs with Different Schedules

Things get a little more complicated when we have two DAGs with different schedules that depend on one another. Say DAG1 processes incoming data every two hours and it saves the data in a database along with a timestamp, which is the execution_date. This is a common practice used to timestamp processed data. If DAG1 is scheduled at 8 AM, it will be executed at 10 AM and the data will be stored with its execution_date timestamp set to 8 AM. Next, let’s add another DAG, that we’ll refer to as DAG2, scheduled once a day at 11 AM to process the latest data produced by DAG1. The latest data available at 11 AM generated by DAG1 is from 10 AM, but the execution_date of DAG1 at 10 AM is really 8 AM, so DAG2 should process the DAG with the execution_date of 8 AM. The question is how to configure DAG2 so it picks up the latest output of DAG1.

Airflow has an ExternalTaskSensor that can be used to make DAG2 wait for DAG1. The easiest way to tell DAG2 that it should wait for DAG1 is through the execution_delta that we pass to the ExternalTaskSensor. Unfortunately this will not work here because the execution_delta is a fixed time offset from the DAG2 execution_date, which is the time DAG2 is scheduled rather than the time it is actually executed. Since DAG2 is scheduled to run once a day, the DAG that runs 11 AM today will actually have an execution_date from twenty-four hours earlier; i.e. 11 AM yesterday. Instead of using execution_delta, we need to get a timestamp set to three hours before DAG2’s actual execution rather than three hours before it is scheduled. We can write a generic method that calculates this value based on the schedules of DAG1 and DAG2 and pass the value to the ExternalTaskSensor as the execution_date_fn. Let’s take a look at why execution_delta doesn’t solve our problem while the execution_date_fn will fit our use case perfectly.

Since DAG2’s execution_date is the day before DAG2 is executed, the execution_delta between DAG1 and DAG2 would be at 8 AM yesterday, which is incorrect and would result in DAG2 using outdated data.

In order to find the most recently stored DAG1 data in the database, we first need to calculate the time DAG2 is actually executed based on its execution_date, which is twenty-four hours later. After that, we find the next execution of DAG1 that follows DAG2’s execution, which is 12 PM. From there, we need to find the previous execution of DAG1, which is 10 AM. Since the data stored from the 10 AM DAG run is actually scheduled one schedule interval earlier, we actually need to find the execution date from two runs prior to the 12 PM run, which is 8 AM today.

Here is the code for the two DAGs and the get_previous_execution_date() method implementation. Please pay attention to the comments in the code that match step numbers in the image above

If you examine the method thoroughly you may think that the step 2. is not necessary and we could calculate the DAG1 execution_date directly but it needs to be there to make it work for schedules that don’t overlap and for schedules that do overlap, that is some of their execution dates are the same. Let’s write a unit test to check it out.

Summary

The Airflow scheduler is tricky and it understanding how how it works will save you hours of debugging your code

You can find the source code here.

--

--