A closer look at Airflow sensors
Apache Airflow is a popular open-source tool for orchestrating complex workflows and data pipelines. One of its key features is the use of sensors — special types of operators designed to wait for a certain condition to be met before proceeding with the next task. This article takes a closer look at Airflow sensors, exploring their types, uses, and best practices.
What Are Airflow Sensors?
Sensors in Airflow are operators that continuously check for a condition and, once the condition is met, allow the workflow to proceed. They are essential for creating dynamic and responsive workflows that depend on the availability of external resources, such as files, database records, or the completion of tasks in other systems.
How Sensors Work
Sensors work by “polling” or “poking” for a condition at regular intervals. They periodically check if the specified condition is met. If it is, the sensor task is marked as successful, and the subsequent tasks are triggered. If the condition is not met within a specified timeout period, the sensor task fails.
Key Parameters
- poke_interval: Time in seconds between checks for the condition.
- timeout: Maximum time in seconds that the sensor will wait for the condition to be met before failing.
- mode: Determines how the sensor operates. Common modes are:
- poke: The sensor task runs at regular intervals until the condition is met or the timeout occurs.
- reschedule: The sensor task frees up resources and reschedules itself to run after the
poke_interval
. - defer: The sensor task yields the control back to the Airflow scheduler and re-enters the task queue, available in Airflow 2.2+.
- soft_fail: If set to
True
, the sensor task will mark itself as skipped rather than failed after a timeout.
Types of Sensors
Airflow provides various built-in sensors to handle different types of conditions:
File Sensors
File sensors wait for a file or directory to appear in a specified location. They are useful for workflows that depend on data files being available before processing.
Example:
from airflow import DAG
from airflow.sensors.filesystem import FileSensor
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 5, 22),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'file_sensor_example',
default_args=default_args,
description='A simple FileSensor DAG',
schedule_interval=timedelta(days=1),
)
start = DummyOperator(
task_id='start',
dag=dag,
)
wait_for_file = FileSensor(
task_id='wait_for_file',
filepath='/path/to/your/file.csv',
fs_conn_id='fs_default',
poke_interval=30,
timeout=600,
dag=dag,
)
process_file = DummyOperator(
task_id='process_file',
dag=dag,
)
start >> wait_for_file >> process_file
Time Sensors
Time sensors wait for a specific time of day or an interval of time to pass before proceeding. They are useful for time-based scheduling.
Example:
from airflow.sensors.time_delta import TimeDeltaSensor
from datetime import timedelta
wait_for_time = TimeDeltaSensor(
task_id='wait_for_5_minutes',
delta=timedelta(minutes=5), dag=dag,
)
External Task Sensors
External task sensors wait for a task in a different DAG or a different execution of the same DAG to complete. They are useful for coordinating workflows across different DAGs.
Example:
from airflow.sensors.external_task_sensor import ExternalTaskSensor
wait_for_task = ExternalTaskSensor(
task_id='wait_for_external_task',
external_dag_id='external_dag',
external_task_id='external_task',
dag=dag,
)
HTTP Sensors
HTTP sensors wait for a specific response from an HTTP endpoint. They are useful for checking the availability of web services or APIs.
Example:
from airflow.sensors.http_sensor import HttpSensor
wait_for_http = HttpSensor(
task_id='wait_for_http',
http_conn_id='http_default',
endpoint='api/status',
request_params={},
response_check=lambda response: response.json()['status'] == 'success',
poke_interval=30,
timeout=600,
dag=dag,
)
SQL Sensors
SQL sensors wait for a specific result from a SQL query. They are useful for checking conditions in databases, such as the availability of a record or the completion of a batch process.
Example:
from airflow.sensors.sql_sensor import SqlSensor
wait_for_sql = SqlSensor(
task_id='wait_for_sql',
conn_id='mysql_default',
sql='SELECT COUNT(1) FROM my_table WHERE condition_met = TRUE',
poke_interval=30,
timeout=600,
dag=dag,
)
Best Practices for Using Sensors
- Set Appropriate Intervals: Choose a
poke_interval
that balances resource usage and responsiveness. Frequent checks can increase load, while too infrequent checks may delay workflow execution. - Timeouts and Retries: Configure sensible
timeout
values and consider enabling retries to handle transient issues. - Use Resource-Friendly Modes: Prefer
reschedule
ordefer
modes overpoke
for long-running sensors to free up worker slots. - Logging and Monitoring: Enable detailed logging to troubleshoot sensor behavior. Use Airflow’s monitoring tools to track sensor performance.
- Custom Sensors: If built-in sensors don’t meet your needs, consider creating custom sensors by subclassing
BaseSensorOperator
.
Conclusion
Sensors are a powerful feature in Apache Airflow, allowing workflows to dynamically respond to external conditions. By understanding the various types of sensors and their parameters, you can build more robust, efficient, and responsive data pipelines. Whether you are waiting for files, HTTP responses, or external tasks, Airflow sensors provide the flexibility and control needed to orchestrate complex workflows seamlessly.
I have personally tried SQL Sensors and File Sensors and they are very good options to make your datapipelines event driven, the moment a file is received or a certain condition is met in Database we can automatically trigger a downstream processing from these files.
Below given is a repo where I have implemented an apache spark processing for a file based on Amazon s3 file sensors