A closer look at Airflow sensors

Anand Satheesh
4 min readMay 23, 2024

--

Apache Airflow is a popular open-source tool for orchestrating complex workflows and data pipelines. One of its key features is the use of sensors — special types of operators designed to wait for a certain condition to be met before proceeding with the next task. This article takes a closer look at Airflow sensors, exploring their types, uses, and best practices.

What Are Airflow Sensors?

Sensors in Airflow are operators that continuously check for a condition and, once the condition is met, allow the workflow to proceed. They are essential for creating dynamic and responsive workflows that depend on the availability of external resources, such as files, database records, or the completion of tasks in other systems.

How Sensors Work

Sensors work by “polling” or “poking” for a condition at regular intervals. They periodically check if the specified condition is met. If it is, the sensor task is marked as successful, and the subsequent tasks are triggered. If the condition is not met within a specified timeout period, the sensor task fails.

Key Parameters

  • poke_interval: Time in seconds between checks for the condition.
  • timeout: Maximum time in seconds that the sensor will wait for the condition to be met before failing.
  • mode: Determines how the sensor operates. Common modes are:
  • poke: The sensor task runs at regular intervals until the condition is met or the timeout occurs.
  • reschedule: The sensor task frees up resources and reschedules itself to run after the poke_interval.
  • defer: The sensor task yields the control back to the Airflow scheduler and re-enters the task queue, available in Airflow 2.2+.
  • soft_fail: If set to True, the sensor task will mark itself as skipped rather than failed after a timeout.

Types of Sensors

Airflow provides various built-in sensors to handle different types of conditions:

File Sensors

File sensors wait for a file or directory to appear in a specified location. They are useful for workflows that depend on data files being available before processing.

Example:

from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 5, 22),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'file_sensor_example',
default_args=default_args,
description='A simple FileSensor DAG',
schedule_interval=timedelta(days=1),
)
start = DummyOperator(
task_id='start',
dag=dag,
)
wait_for_file = FileSensor(
task_id='wait_for_file',
filepath='/path/to/your/file.csv',
fs_conn_id='fs_default',
poke_interval=30,
timeout=600,
dag=dag,
)
process_file = DummyOperator(
task_id='process_file',
dag=dag,
)
start >> wait_for_file >> process_file

Time Sensors

Time sensors wait for a specific time of day or an interval of time to pass before proceeding. They are useful for time-based scheduling.

Example:

from airflow.sensors.time_delta import TimeDeltaSensor
from datetime import timedelta

wait_for_time = TimeDeltaSensor(
task_id='wait_for_5_minutes',
delta=timedelta(minutes=5), dag=dag,
)

External Task Sensors

External task sensors wait for a task in a different DAG or a different execution of the same DAG to complete. They are useful for coordinating workflows across different DAGs.

Example:

from airflow.sensors.external_task_sensor import ExternalTaskSensor

wait_for_task = ExternalTaskSensor(
task_id='wait_for_external_task',
external_dag_id='external_dag',
external_task_id='external_task',
dag=dag,
)

HTTP Sensors

HTTP sensors wait for a specific response from an HTTP endpoint. They are useful for checking the availability of web services or APIs.

Example:

from airflow.sensors.http_sensor import HttpSensor

wait_for_http = HttpSensor(
task_id='wait_for_http',
http_conn_id='http_default',
endpoint='api/status',
request_params={},
response_check=lambda response: response.json()['status'] == 'success',
poke_interval=30,
timeout=600,
dag=dag,
)

SQL Sensors

SQL sensors wait for a specific result from a SQL query. They are useful for checking conditions in databases, such as the availability of a record or the completion of a batch process.

Example:

from airflow.sensors.sql_sensor import SqlSensor

wait_for_sql = SqlSensor(
task_id='wait_for_sql',
conn_id='mysql_default',
sql='SELECT COUNT(1) FROM my_table WHERE condition_met = TRUE',
poke_interval=30,
timeout=600,
dag=dag,
)

Best Practices for Using Sensors

  1. Set Appropriate Intervals: Choose a poke_interval that balances resource usage and responsiveness. Frequent checks can increase load, while too infrequent checks may delay workflow execution.
  2. Timeouts and Retries: Configure sensible timeout values and consider enabling retries to handle transient issues.
  3. Use Resource-Friendly Modes: Prefer reschedule or defer modes over poke for long-running sensors to free up worker slots.
  4. Logging and Monitoring: Enable detailed logging to troubleshoot sensor behavior. Use Airflow’s monitoring tools to track sensor performance.
  5. Custom Sensors: If built-in sensors don’t meet your needs, consider creating custom sensors by subclassing BaseSensorOperator.

Conclusion

Sensors are a powerful feature in Apache Airflow, allowing workflows to dynamically respond to external conditions. By understanding the various types of sensors and their parameters, you can build more robust, efficient, and responsive data pipelines. Whether you are waiting for files, HTTP responses, or external tasks, Airflow sensors provide the flexibility and control needed to orchestrate complex workflows seamlessly.

I have personally tried SQL Sensors and File Sensors and they are very good options to make your datapipelines event driven, the moment a file is received or a certain condition is met in Database we can automatically trigger a downstream processing from these files.

Below given is a repo where I have implemented an apache spark processing for a file based on Amazon s3 file sensors

https://github.com/anands282/airflow_sensors

--

--

Anand Satheesh

Experienced software architect with 11+ years in big data. Passionate about building scalable solutions and driving innovation.