Sensing the completion of external airflow tasks via ExternalTaskSensors

apache-airflow==1.10.4

Flavia Ninsiima
3 min readMar 13, 2020

The dilemma?

Let’s assume you want Task_A in DAG_A to sense the completion of Task_B in DAG_B

If these two tasks were in the same DAG, then we could simply add this line of code in the DAG file: `Task_A.set_upstream(Task_B)`. However since they aren’t in the same DAG file, we cannot do this.

The solution?

Define an ExternalTaskSensor in DAG_A that senses the completion of Task_B in DAG_B. According to the docs, an external task sensor waits for a different DAG or a task in a different DAG to complete for a specific execution_date

The main issue is actually sensing the `success` state of Task_B in DAG_B.

Let’s use this example:
- DAG_A runs at 00:00:00 daily
- DAG_B runs at 01:00:00 daily

When writing the sensing code, the trick here is in defining the execution_delta or external_date_fn.

https://airflow.apache.org/docs/stable/_modules/airflow/sensors/external_task_sensor.html

In the python file where DAG_A and Task_A are defined, add an external sensor sensing the completion of Task_B in DAG_B

Where dt represents the current execution date of DAG_A. So if DAG_A has an exec_date (different from run_date) ofTimestamp(‘2020–03–04 00:00:00’) and DAG_B has an exec_date ofTimestamp(‘2020–03–04 01:00:00’) we simply add a timedelta(hours=1).

If opt to use execution_delta instead of execution_date_fn, you have to be aware of the counter-intuitiveness of the sign function, as mentioned in the airflow docs. Even though DAG_B runs at 1:00 daily, we give it a delta of -1.

If you use execution_date_fn which returns an execution_date, you could optionally return a pandas timestamp. For example, if i wanted to sense something that happens at the end of the month: execution_date_fn=lambda dt: pd.Timestamp(utils.get_end_month(dt), tz=’UTC’)

Conclusion
Hopefully this article gave some insight into how to sense external tasks. The trick is to first understand what the current execution_date of DAG_A will be in relation to the execution_date of DAG_B and then you can determine how to set your deltas. Most of the time, I first ascertain this relationship via the Airflow UI. Keep in mind run_date and execution_date are different things.

Other Resources

--

--