Optimizing Workflows with Apache Airflow In Terms of Sequencing and Dependency Management Using External Sensor

Yousef Alkhanafseh
TurkNet Technology
Published in
7 min readFeb 20, 2024

Focusing on consolidating the idea of using Apache Airflow External Sensor Task to schedule two distinct DAGs.

Figure 1. Image generated by DALL-E 3 illustrating scheduling processes.

I. INTRODUCTION

Typically, when designing sequential or parallel tasks, they should be organized to follow a specific workflow, starting from the initial point and progressing through to completion. In addition, in some cases, a particular job may need to wait until the completion of other jobs due to its importance, dependencies on other jobs, or limitations in available resources that prevent handling all jobs simultaneously. In this article, two real workflows are designed using Airflow to illustrate the method of making one job start immediately after the completion of another job by adding a sensor to its starting point. This sensor plays a crucial role in retrieving the latest datetime of the last DAG run. Utilizing this datetime information, the sensor triggers the DAG to run only if it is not currently in progress. Moreover, all the necessary information and code components are included and disscussed in detail.

II. Apache Airflow

Apache Airflow can be defined as an open-source workflow tool built on the basis of the directed acyclic graph (DAG) theory. This implies that defined workflows in Apache Airflow consist only of vertices and edges, with each edge directed from one vertex to another, under the condition that no closed loops are formed. In general, Apache Airflow is utilized for developing, scheduling, and monitoring batch-oriented workflows [1]. It is capable of triggering various file types, such as Python, Bash, Email, Datetime, etc [2]. Furthermore, it can run as a single node or in cluster mode. It features a special interactive interface that enables the management of defined workflows as well.

In order to establish a workflow in Apache airflow, the following two types of arguments must be understood well, DAG default arguments and DAG arguments . The first arguments are owner, start_date, depends_on_past, email, email_on_failure, email_on_retry, retries, retry_delay, and execution_timeout [3]. The type and the purpose of each argument is stated bellow in Table 1.

Table 1. DAG default arguments.

Moreover, the most important DAG arguments are dag_id, defauly_args, description, max_active_runs, schedule_interval, tags, catchup, and doc_md [4]. The type and purpose of each item are explicitly outlined in the subsequent table, Table 2.

Table 2. DAG arguments.

III. SENSORS

In Apache Airflow, a defined DAG/workflow can wait for another DAG until it is success, failed, or queued by defining a task on the beginning of the DAG that must wait using ExternalTaskSensor with specific execution date [5]. In addition, this is also valid for specific task inside one DAG that should wait for the complete DAG or its some tasks [6].

To effectively define an external sensor, it is essential to have a comprehensive understanding of key arguments, including but not limited to sensor_id, external_dag_id, external_task_id, execution_date_fn, allowed_states, failed_states, mode, and timeout [7][8], see Table 3.

Table 3. ExternalTaskSensor arguments.

IV. CASE STUDY

In this section, the preparation, detailed explanation, and Python scripts for two distinct DAGs are provided in a significant manner. These DAGS are considered as main and subsidiary DAGs. The second DAG is designed to wait for the completion of the first DAG, functioning as a subsidiary task that remains on hold until the initial one is finished. Figure 2 summarizes the complete process from the beginning until the end, illustrating how the subsidiary DAG is waiting for the main DAG to complete.

Figure 2. Animated example of the given case study.

The two DAGs are designed for tutorial purposes. As a result, all their tasks are designed to sleep for only 2 seconds each. Their Python scripts are mentioned below.

A. Main DAG:

####
# Needed Libraries
####

from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator

t5_status = True

def t5_t5p():
if t5_status:
return "TASK5"
else:
return "TASK5_P"

####
# DAG Definition
####

default_args = {
'owner': 'airflow',
'start_date': datetime(2020,12,9,12,0,0),
'depends_on_past': False,
'email': ["yousef.alkhanafseh@turk.net"],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}

dag = DAG(
'MAIN_DAG',
default_args=default_args,
description='The important DAG which runs without Sensor',
max_active_runs=1,
schedule_interval = '*/5 * * * *',
catchup=False
)

####
# Tasks Definition
####

ts = BashOperator(
task_id='START',
bash_command='sleep 2',
dag=dag,
execution_timeout = timedelta(minutes=1),
retries = 0,
)

t1a = BashOperator(
task_id='TASK1_a',
bash_command='sleep 2',
dag=dag,
execution_timeout = timedelta(minutes=1),
retries = 0,
)

t1b = BashOperator(
task_id='TASK1_b',
bash_command='sleep 2',
dag=dag,
execution_timeout = timedelta(minutes=1),
retries = 0,
)

t1c = BashOperator(
task_id='TASK1_c',
bash_command='sleep 2',
dag=dag,
execution_timeout = timedelta(minutes=1),
retries = 0,
)

t1d = BashOperator(
task_id='TASK1_d',
bash_command='sleep 2',
dag=dag,
execution_timeout = timedelta(minutes=1),
retries = 0,
)

t2a = BashOperator(
task_id='TASK2_a',
bash_command='sleep 2',
dag=dag,
execution_timeout = timedelta(minutes=1),
retries = 0,
)

t2b = BashOperator(
task_id='TASK2_b',
bash_command='sleep 2',
dag=dag,
execution_timeout = timedelta(minutes=1),
retries = 0,
)

t2c = BashOperator(
task_id='TASK2_c',
bash_command='sleep 2',
dag=dag,
execution_timeout = timedelta(minutes=1),
retries = 0,
)

t2d = BashOperator(
task_id='TASK2_d',
bash_command='sleep 2',
dag=dag,
execution_timeout = timedelta(minutes=1),
retries = 0,
)

t3 = BashOperator(
task_id='TASK3',
bash_command='sleep 2',
dag=dag,
execution_timeout = timedelta(minutes=1),
retries = 0,
)


t4 = BranchPythonOperator(
task_id="TASK4",
python_callable = t5_t5p,
dag = dag,
execution_timeout = timedelta(minutes=1),
retries = 2,
)


t5 = BashOperator(
task_id='TASK5',
bash_command='sleep 2',
dag=dag,
execution_timeout = timedelta(minutes=1),
retries = 0,
)

t5p = BashOperator(
task_id='TASK5_p',
bash_command='sleep 2',
dag=dag,
execution_timeout = timedelta(minutes=1),
retries = 0,
)


t6 = BashOperator(
task_id='TASK6',
bash_command='sleep 2',
dag=dag,
execution_timeout = timedelta(minutes=1),
retries = 0,
)

t6p = BashOperator(
task_id='TASK6_p',
bash_command='sleep 2',
dag=dag,
execution_timeout = timedelta(minutes=1),
retries = 0,
)



t7 = BashOperator(
task_id='TASK7',
bash_command='sleep 2',
dag=dag,
execution_timeout = timedelta(minutes=1),
retries = 0,
)

te = BashOperator(
task_id='END',
bash_command='sleep 2',
trigger_rule='one_success',
dag=dag,
execution_timeout = timedelta(minutes=1),
retries = 0,
)

####
# Tasks Scheduling
####

ts.set_downstream(t1a)
ts.set_downstream(t1b)
ts.set_downstream(t1c)
ts.set_downstream(t1d)
t1a.set_downstream(t2a)
t1b.set_downstream(t2b)
t1c.set_downstream(t2c)
t1d.set_downstream(t2d)
t2a.set_downstream(t3)
t2b.set_downstream(t3)
t2c.set_downstream(t3)
t2d.set_downstream(t3)
t3.set_downstream(t4)
t4.set_downstream(t5)
t4.set_downstream(t5p)
t5.set_downstream(t6)
t5.set_downstream(t7)
t5p.set_downstream(t6p)
t6.set_downstream(te)
t7.set_downstream(te)
t6p.set_downstream(te)

The DAG graph obtained from the previous script can be clearly seen in Figure 4.

Figure 4. Airflow Main DAG

B. Subsidiary DAG:

####
# Needed Libraries
####

from datetime import timedelta, datetime
from airflow import DAG
from airflow.models import DagRun
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from airflow.models import DagRun

t1_status = True
t4_status = False

def t1_t1p():
if t1_status:
return "TASK1"
else:
return "TASK1_P"


def t4_t4p():
if t4_status:
return "TASK4"
else:
return "TASK4_P"


def get_most_recent_dag_run(dt):
dag_runs = DagRun.find(dag_id="MAIN_DAG")
print(dag_runs)
dag_runs.sort(key=lambda x: x.execution_date, reverse=True)
print(dag_runs)
if dag_runs:
return dag_runs[0].execution_date

####
# DAG Definition
####

default_args = {
'owner': 'airflow',
'start_date': datetime(2020,12,9,12,0,0),
'depends_on_past': False,
'email': ["yousef.alkhanafseh@turk.net"],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
}
dag = DAG(
'SUBSIDIARY_DAG',
default_args=default_args,
description='The less important DAG which runs with Sensor',
max_active_runs=1,
schedule_interval = '*/5 * * * *',
catchup=False
)

####
# Sensor Definition
####

sensor = ExternalTaskSensor(task_id='SENSOR', external_dag_id = 'MAIN_DAG', external_task_id =None, allowed_states= ["failed", "success", "queued"], execution_date_fn=get_most_recent_dag_run,dag=dag, mode = 'reschedule')

####
# Tasks Definition
####

ts = BranchPythonOperator(
task_id="START",
python_callable = t1_t1p,
dag = dag,
execution_timeout = timedelta(minutes=1),
retries = 2,
)

t1 = BashOperator(
task_id='TASK1',
bash_command='sleep 2',
dag=dag,
execution_timeout = timedelta(minutes=1),
retries = 0,
)

t1p = BashOperator(
task_id='TASK1_P',
bash_command='sleep 2',
dag=dag,
execution_timeout = timedelta(minutes=1),
retries = 0,
)

t1pp = BashOperator(
task_id='TASK1_PP',
bash_command='sleep 2',
dag=dag,
execution_timeout = timedelta(minutes=1),
retries = 0,
)

t2 = BashOperator(
task_id='TASK2',
bash_command='sleep 2',
dag=dag,
trigger_rule='one_success',
execution_timeout = timedelta(minutes=1),
retries = 0,
)

t3 = BranchPythonOperator(
task_id="TASK3",
python_callable = t4_t4p,
dag = dag,
execution_timeout = timedelta(minutes=1),
retries = 2,
)

t4 = BashOperator(
task_id='TASK4',
bash_command='sleep 2',
dag=dag,
execution_timeout = timedelta(minutes=1),
retries = 0,
)

t4p = BashOperator(
task_id='TASK4_P',
bash_command='sleep 2',
dag=dag,
execution_timeout = timedelta(minutes=1),
retries = 0,
)

te = BashOperator(
task_id='END',
bash_command='sleep 2',
dag=dag,
trigger_rule='one_success',
execution_timeout = timedelta(minutes=1),
retries = 0,
)

####
# Tasks Scheduling
####

ts.set_upstream(sensor)
ts.set_downstream(t1)
ts.set_downstream(t1p)
t1p.set_downstream(t1pp)
t1pp.set_downstream(t2)
t1.set_downstream(t2)
t2.set_downstream(t3)
t3.set_downstream(t4)
t3.set_downstream(t4p)
t4p.set_downstream(te)
t4.set_downstream(te)

The waiting process of the subsidiary DAG is clearly displayed in Figure 5a below. Afterward, the complete flow of the subsidiary DAG is represented in Figure 5b.

Figure 5.a. Airflow Subsidary DAG waiting the Main DAG due to the Sensor
Figure 5.b. Airflow Subsidary DAG.

V. CONCLUSION

In conclusion, this tutorial extensively covers the essential DAG and ExternalTaskSensor arguments. The working principle of the latter is briefly mentioned as well. Building upon this understanding, a case study involving two distinct DAGs is presented to further consolidate the concept of how ExternalTaskSensor operates. It is complemented by an animated GIF as well, thereby enhancing the capability to develop, schedule, and monitor batch-oriented workflows within Airflow. The complete Python scripts for the designed scenarios are provided for reference.

Image generated by DALL-E 3 with text added by hand.

VI. REFERENCES

[1] Apache Airflow (n.d). What is Airflow?. Accessed on [19.02.2024]. Retrieved from: https://airflow.apache.org/docs/apache-airflow/stable/index.html

[2] Apache Airflow (n.d). airflow.operators. Accessed on [20.02.2024]. Retrieved from: https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/operators/index.html

[3] Apache Airflow (n.d). Sensors. Accessed on [19.02.2024]. Retrieved from: https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/sensors.html

[4] Apache Airflow (n.d). airflow.models.dag. Accessed on [19.02.2024]. Retrieved from: https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/models/dag/index.html

[5] Airflow (n.d). airflow.sensors.external_task_sensor. Accessed on [20.02.2024]. Retrieved from: https://airflow.apache.org/docs/apache-airflow/1.10.3/_api/airflow/sensors/external_task_sensor/index.html

[6] Apache Airflow (n.d). Cross-DAG Dependencies. Accessed on [18.02.2024]. Retrieved from: https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/external_task_sensor.html

[7] Apache Airflow (n.d). airflow.sensors.external_task. Accessed on [19.02.2024]. Retrieved from: https://airflow.apache.org/docs/apache-airflow/stable/_api/airflow/sensors/external_task/

[8] Apache Airflow (n.d). Sensors. Accessed on [18.02.2024]. Retrieved from: https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/sensors.html

--

--