Airflow features — Callback, Trigger & Cluster Policy
Lesser discussed features of Airflow
In this blog, we will discuss the following features of Airflow
- Callbacks
- Cluster Policy
- Task Dependency
- Trigger Rules
Let us roll!
Callbacks
Callbacks allow us to add special instructions to tasks at different stage in their lifecycle (start, succeed or fail). There are three ways to use callback
- Through DAG initialization —action/function will be called after all leaf tasks are done.
- Through Task initialization — action/function will be called after the task is done
- Through the default args, which are passed into ALL operator as keyword arguments
There are 5 major callbacks
- on_execute_callback
- on_success_callback
- on_retry_callback
- on_failure_callback
- sla_miss_callback
Operator Hooks
Above mentioned callbacks are something that is defined at the task/dag level. Airflow provides a mechanism for some action before a task is executed (operator’s execute method) at the operator level. We have hooks that get called before the actual execute function is executed.
We have mainly three types of hooks.
- pre_execute — this will invoke the desired function before the actual task starts. This can be useful if we wish to skip some tasks for certain dag runs without breaking dependencies.
- post_execute
- on_kill — used to clean up resources like threads or processes, etc.
Cluster Policy
- Dag policy — Runs as DAG load time.
- Task policy — Runs as DAG load time.
- Task Instance mutation — This only mutates task properties when the task is scheduled. If you run the task manually it does not take effect. Hooks are only called when the dag run is created. Mutation happens just before task execution.
Task Dependencies
In a DAG, a node is a task and dependencies are the directed edges. Edges determine how we move through the graph. Dependencies are key to defining flexible pipelines.
We can set dependency between tasks or to be more precise pipeline flow, using the following two methods.
- set_downstream
- set_upstream
We can also set dependency between tasks in the current run and the previous run of the DAG.
- depends_on_past — If set to true, the task in the current DAG run will only run if the same task succeeded or was skipped in the previous run.
- wait_for_downstream — If set to true, the task in the current run will only run if the same task succeeded or skipped in the previous run and the immediate downstream task in the previous run also succeeded or was skipped.
Trigger Rules
- all_success (default): Dependent task will execute if all upstream tasks have succeeded, failed, or skipped task will cause the dependent task to be skipped.
All upstream task == all successful up stream task
. - all_failed: Dependent task runs when all upstream tasks are in a
failed
orupstream_failed
state. - all_done: Dependent task runs when all upstream tasks are done with their execution, success, failure, or skipped does not matter. Their execution must have been completed. In this case
all upstream task ≤ count of succeeded, failed, upstream_failed, skipped tasks
. - one_failed: Dependent task runs when at least one upstream task has failed (does not wait for all upstream tasks to be done).
- one_success: Dependent task run when at least one upstream task has succeeded (does not wait for all upstream tasks to be done).
- none_failed: Dependent task runs only when all upstream tasks have not
failed
orupstream_failed
- that is, all upstream tasks havesucceeded
or beenskipped
. - none_failed_min_one_success: Dependent task runs only when all upstream tasks have not
failed
orupstream_failed
, and at least one upstream task has succeeded. - none_skipped: Dependent task runs only when no upstream task is in a
skipped
state - that is, all upstream tasks are in asuccess
,failed
, orupstream_failed
state. - always: Dependent tasks will run at any time.
Note: In case of on_failure_fallback , the update_state method in the DagRun class checks if all tasks are finished and then checks if there is any task that is in a failure state. If there is any, it calls the on_failure_callback directly. In case multiple tasks have failed, it may result in random task ids in the callback.
Happy Workflows!!!