Sensing the completion of external airflow tasks via ExternalTaskSensors
apache-airflow==1.10.4
The dilemma?
Let’s assume you want Task_A in DAG_A to sense the completion of Task_B in DAG_B
If these two tasks were in the same DAG, then we could simply add this line of code in the DAG file: `Task_A.set_upstream(Task_B)`. However since they aren’t in the same DAG file, we cannot do this.
The solution?
Define an ExternalTaskSensor in DAG_A that senses the completion of Task_B in DAG_B. According to the docs, an external task sensor waits for a different DAG or a task in a different DAG to complete for a specific execution_date
The main issue is actually sensing the `success` state of Task_B in DAG_B.
Let’s use this example:
- DAG_A runs at 00:00:00 daily
- DAG_B runs at 01:00:00 daily
When writing the sensing code, the trick here is in defining the execution_delta or external_date_fn.
In the python file where DAG_A and Task_A are defined, add an external sensor sensing the completion of Task_B in DAG_B
Where dt
represents the current execution date of DAG_A. So if DAG_A has an exec_date (different from run_date) ofTimestamp(‘2020–03–04 00:00:00’)
and DAG_B has an exec_date ofTimestamp(‘2020–03–04 01:00:00’)
we simply add a timedelta(hours=1)
.
If opt to use execution_delta instead of execution_date_fn, you have to be aware of the counter-intuitiveness of the sign function, as mentioned in the airflow docs. Even though DAG_B runs at 1:00 daily, we give it a delta of -1.
If you use execution_date_fn which returns an execution_date, you could optionally return a pandas timestamp. For example, if i wanted to sense something that happens at the end of the month: execution_date_fn=lambda dt: pd.Timestamp(utils.get_end_month(dt), tz=’UTC’)
Conclusion
Hopefully this article gave some insight into how to sense external tasks. The trick is to first understand what the current execution_date of DAG_A will be in relation to the execution_date of DAG_B and then you can determine how to set your deltas. Most of the time, I first ascertain this relationship via the Airflow UI. Keep in mind run_date and execution_date are different things.
Other Resources