Apache Airflow Useful Practices: Sensor Operator

This article is one of the Apache Airflow’s useful practices series that I found after using it for a while.

Chanon Krittapholchai
6 min readMar 16, 2024
Apache Airflow’s Logo

Apache Airflow is an open-source platform created by the community to programmatically author, schedule and monitor workflows.

In this article, I would like to share about the practice to use Sensor Operator in Apache Airflow.

What is SensorOperator?

SensorOperator is an Operator that will block our DAG by keep checking a certain condition until that condition was met.

There are many kinds of SensorOperator, in this article, we will only focus on these basic SensorOperators.

  1. FileSensor
  2. DateTimeSensor
  3. ExternalTaskSensor
  4. PythonSensor

You can find out more about the SensorOperator in these links.

  1. Sensors — Airflow Documentation (apache.org)
  2. Airflow sensors | Astronomer Documentation

FileSensor

FileSensor is a sensor that will keep checking if the target file exists or not.

This is an example to use the FileSensor to check /home/hello.txt .

from airflow.models import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.operators.empty import EmptyOperator
from pendulum import datetime, now

# Create Simple DAG
with DAG( dag_id= 'medium_file_sensor',
schedule= '@daily',
catchup= False,
start_date= datetime(2024,3,1),
max_active_runs= 1
) :
# Start
start= EmptyOperator(task_id= 'start')
# Add Sensor
waiting_for_file= FileSensor(
task_id= 'waiting_for_file',
filepath= '/home/hello.txt'
)
# End
end= EmptyOperator(task_id= 'end')
# Set Dependencies Flow
start >> waiting_for_file >> end

The task waiting_for_file will keep running until the target file exists.

FileSensor in Airflow UI.

DateTimeSensor

DateTimeSensor is a sensor that will keep checking if current time pass the target datetime or not.

This is an example to use the DateTimeSensor to check if current time pass 2024-03-10 4:35 PM (UTC+7) .

from airflow.models import DAG
from airflow.sensors.date_time import DateTimeSensor
from airflow.operators.empty import EmptyOperator
from pendulum import datetime, now

# Create Simple DAG
with DAG( dag_id= 'medium_datetime_sensor',
schedule= '@daily',
catchup= False,
start_date= datetime(2024,3,1),
max_active_runs= 1
) :
# Start
start= EmptyOperator(task_id= 'start')
# Add Sensor
waiting_for_datetime= DateTimeSensor(
task_id= 'waiting_for_datetime',
target_time= datetime(2024,3,10,16,36,tz= 'Asia/Bangkok')
)
# End
end= EmptyOperator(task_id= 'end')
# Set Dependencies Flow
start >> waiting_for_datetime >> end

The task waiting_for_datetime will keep running until pass the target time.

DateTimeSensor in Airflow UI.

PythonSensor

PythonSensor is a sensor that will execute Python to do something to return Boolean value, if it’s True then process to the next step.

Additionally, PythonSensor also able to pass a value to Airflow’s XCom.

This is an example of how to PythonSensor to check if current time pass 2024-03-10 4:35 PM (UTC+7) just like DateTimeSensor and it will also send the string Hello word to Airflow’s XCom for the next task.

from airflow.models import DAG
from airflow.decorators import task
from airflow.sensors.base import PokeReturnValue
from airflow.operators.empty import EmptyOperator
from pendulum import datetime, now

# Create Simple DAG
with DAG( dag_id= 'medium_python_sensor',
schedule= '@daily',
catchup= False,
start_date= datetime(2024,3,1),
max_active_runs= 1
) :
# Start
start= EmptyOperator(task_id= 'start')

# Add Sensor
@task.sensor(task_id= 'check_datetime_python')
def check_datetime_python_task() -> PokeReturnValue:
# Check current > target
condition_met = now() >= datetime(2024,3,10,16,36,tz= 'Asia/Bangkok')
if condition_met :
# Return Something
operator_return_value = 'hello world'
else:
# Return Value as None if condition doesn't met
operator_return_value = None
# Return Poke Value
return PokeReturnValue(is_done=condition_met,
xcom_value=operator_return_value)
# Print Sensor's Value
@task(task_id= 'print_value')
def print_value_task(content) :
print(content)
check_datetime_python= check_datetime_python_task()
print_value= print_value_task(check_datetime_python)

# End
end= EmptyOperator(task_id= 'end')
# Set Dependencies Flow
start >> check_datetime_python >> print_value >> end
PythonSensor in Airflow UI.

Personally, I wouldn’t recommend using this PythonSensor unless we have no other choice.

ExternalTaskSensor

ExternalTaskSensor is a sensor that will keep checking one of these.

  1. Check if a certain task in the upstream DAG is finish or not.
  2. Check if the upstream DAG is finish or not.

*DAG Run Date of both upstream DAG and Sensor must be the same.

This is an example to use the ExternalTaskSensor if the upstream DAG named medium_datetime_sensor from the previous example finish or not.

from airflow.models import DAG
from airflow.sensors.external_task import ExternalTaskSensor
from airflow.operators.empty import EmptyOperator
from pendulum import datetime, now

# Create Simple DAG
with DAG( dag_id= 'medium_external_sensor',
schedule= '@daily',
catchup= False,
start_date= datetime(2024,3,1),
max_active_runs= 1
) :
# Start
start= EmptyOperator(task_id= 'start')
# Add Sensor
waiting_for_upstream= ExternalTaskSensor(
task_id= 'waiting_for_upstream',
external_dag_id= 'medium_datetime_sensor',
external_task_id= None # None for DAG finish, Task_id for specific task
)
# End
end= EmptyOperator(task_id= 'end')
# Set Dependencies Flow
start >> waiting_for_upstream >> end

One good thing about this sensor is that we can re-direct into the upstream DAG using the External DAG button.

ExternalTaskSensor in Airflow UI and Re-direct button.

Something to be aware of is that the default ExternalTaskSensor will only check the upstream DAG’s status only when the current DAG and the upstream DAG have exactly the same DAG execution date.

But we can make some adjustments with the execution_date_fn parameter.

This is an example if we want the current DAG to check the upstream DAG from previous date.

waiting_for_upstream= ExternalTaskSensor(
task_id= 'waiting_for_upstream',
external_dag_id= 'medium_datetime_sensor',
external_task_id= None, # None for DAG finish, Task_id for specific task
execution_date_fn= lambda dt : dt.add(days= -1) # Input of function is DAG execution date (pendulum datetime)
)

Idempotent SensorOperator

From my previous article about Idempotent DAG HERE.

We also want our SensorOperator to has an Idempotent behavior too.

That could be done with the same template method as the previous article.

This is an example of a simple DAG with Idempotent FileSensor and Idempotent DateTimeSensor.

from airflow.models import DAG
from airflow.sensors.date_time import DateTimeSensor
from airflow.sensors.filesystem import FileSensor
from airflow.operators.empty import EmptyOperator
from pendulum import datetime, now

# Create Simple DAG
with DAG( dag_id= 'medium_idempotent_sensor',
schedule= '@daily',
catchup= False,
start_date= datetime(2024,3,1),
max_active_runs= 1
) :
# Start
start= EmptyOperator(task_id= 'start')
# Add DateTime Sensor
waiting_for_datetime= DateTimeSensor(
task_id= 'waiting_for_datetime',
# At 11 PM
target_time= '{{ data_interval_end.in_tz("Asia/Bangkok").replace(hour= 23) }}'
)
# Add File Sensor
waiting_for_file= FileSensor(
task_id= 'waiting_for_file',
# File name is hello_YYYYMMDD.txt
filepath= '/home/hello_{{ data_interval_end.in_tz("Asia/Bangkok").strftime("%Y%m%d") }}.txt',
)
# End
end= EmptyOperator(task_id= 'end')
# Set Dependencies Flow
start >> [waiting_for_datetime, waiting_for_file] >> end

It will create a DAG which apply the Idempotent concept into sensors.

Idempotent concept already applied into the target of the FileSensor.

Others with SensorOperator

There are many useful things that we can apply to optimize our Sensor.

SensorOperator PokeInterval & Timeout

Every SensorOperators are built-in with these parameters.

  1. poke_interval: After check, how long should the Sensor wait before check again.
  2. timeout: How long can this Sensor wait before raise an error.

Here is the sample of these parameters.

    FileSensor(
task_id= 'waiting_for_file',
filepath= '/home/hello.txt',
poke_interval= 30, # Check every 30 seconds
timeout= 3600 # After 1st poke, will wait for 1 hour before raise an error
)

SensorOperator Mode

Mode is the behavior of the Sensor during the poke_interval, there are 3 different modes.

poke : Sensor will be active, it’s fast but it will consume resources.

reschedule : Sensor will be inactive, slower but consume less resources.

deferrable : Consume even less resource and even slower than reschedule.
*Don’t forget to airflow triggerer before use deferrable.

Deferrable is more complicate than poke and reschedule. If you want to understand how it works, I suggest taking this free course from Astronomer : Airflow: Deferrable Operators (astronomer.io)

Here is the example of how to use each mode with DateTimeSensor.

from airflow.models import DAG
from airflow.sensors.date_time import DateTimeSensor, DateTimeSensorAsync
from pendulum import datetime, now

# Create Simple DAG with POKE
with DAG( dag_id= 'medium_poke',
schedule= '@daily',
catchup= True,
start_date= datetime(2024,1,1),
) :
# Add Sensor
poke= DateTimeSensor(
task_id= 'waiting_for_datetime',
target_time= "{{ data_interval_end.add(years= 1) }}",
mode= 'poke'
)

# Create Simple DAG with RESCHEDULE
with DAG( dag_id= 'medium_reschedule',
schedule= '@daily',
catchup= True,
start_date= datetime(2024,1,1),
) :
# Add Sensor
reschedule= DateTimeSensor(
task_id= 'waiting_for_datetime',
target_time= "{{ data_interval_end.add(years= 1) }}",
mode= 'reschedule'
)

# Create Simple DAG with DEFERRABLE
with DAG( dag_id= 'medium_deferrable',
schedule= '@daily',
catchup= True,
start_date= datetime(2024,1,1),
) :
# Add Sensor
deferrable= DateTimeSensorAsync(
task_id= 'waiting_for_datetime',
target_time= "{{ data_interval_end.add(years= 1) }}"
)

And that is about SensorOperator in Apache Airflow.

Hopefully, this article will help!!

--

--