Airflow features — Callback, Trigger & Cluster Policy

Amit Singh Rathore
Nerd For Tech
Published in
4 min readAug 18, 2022

Lesser discussed features of Airflow

In this blog, we will discuss the following features of Airflow

  • Callbacks
  • Cluster Policy
  • Task Dependency
  • Trigger Rules

Let us roll!

Callbacks

Callbacks allow us to add special instructions to tasks at different stage in their lifecycle (start, succeed or fail). There are three ways to use callback

  1. Through DAG initialization —action/function will be called after all leaf tasks are done.
  2. Through Task initialization — action/function will be called after the task is done
  3. Through the default args, which are passed into ALL operator as keyword arguments

There are 5 major callbacks

  • on_execute_callback
  • on_success_callback
  • on_retry_callback
  • on_failure_callback
  • sla_miss_callback

Operator Hooks

Above mentioned callbacks are something that is defined at the task/dag level. Airflow provides a mechanism for some action before a task is executed (operator’s execute method) at the operator level. We have hooks that get called before the actual execute function is executed.

We have mainly three types of hooks.

  • pre_execute — this will invoke the desired function before the actual task starts. This can be useful if we wish to skip some tasks for certain dag runs without breaking dependencies.
  • post_execute
  • on_kill — used to clean up resources like threads or processes, etc.

Cluster Policy

  • Dag policy — Runs as DAG load time.
  • Task policy — Runs as DAG load time.
  • Task Instance mutation — This only mutates task properties when the task is scheduled. If you run the task manually it does not take effect. Hooks are only called when the dag run is created. Mutation happens just before task execution.

Task Dependencies

In a DAG, a node is a task and dependencies are the directed edges. Edges determine how we move through the graph. Dependencies are key to defining flexible pipelines.

We can set dependency between tasks or to be more precise pipeline flow, using the following two methods.

  • set_downstream
  • set_upstream

We can also set dependency between tasks in the current run and the previous run of the DAG.

  • depends_on_past — If set to true, the task in the current DAG run will only run if the same task succeeded or was skipped in the previous run.
  • wait_for_downstream — If set to true, the task in the current run will only run if the same task succeeded or skipped in the previous run and the immediate downstream task in the previous run also succeeded or was skipped.

Trigger Rules

  • all_success (default): Dependent task will execute if all upstream tasks have succeeded, failed, or skipped task will cause the dependent task to be skipped. All upstream task == all successful up stream task.
  • all_failed: Dependent task runs when all upstream tasks are in a failed or upstream_failed state.
  • all_done: Dependent task runs when all upstream tasks are done with their execution, success, failure, or skipped does not matter. Their execution must have been completed. In this case all upstream task ≤ count of succeeded, failed, upstream_failed, skipped tasks.
  • one_failed: Dependent task runs when at least one upstream task has failed (does not wait for all upstream tasks to be done).
  • one_success: Dependent task run when at least one upstream task has succeeded (does not wait for all upstream tasks to be done).
  • none_failed: Dependent task runs only when all upstream tasks have not failed or upstream_failed - that is, all upstream tasks have succeeded or been skipped .
  • none_failed_min_one_success: Dependent task runs only when all upstream tasks have not failed or upstream_failed, and at least one upstream task has succeeded.
  • none_skipped: Dependent task runs only when no upstream task is in a skipped state - that is, all upstream tasks are in a success, failed, or upstream_failed state.
  • always: Dependent tasks will run at any time.

Note: In case of on_failure_fallback , the update_state method in the DagRun class checks if all tasks are finished and then checks if there is any task that is in a failure state. If there is any, it calls the on_failure_callback directly. In case multiple tasks have failed, it may result in random task ids in the callback.

Happy Workflows!!!

--

--

Amit Singh Rathore
Nerd For Tech

Staff Data Engineer @ Visa — Writes about Cloud | Big Data | ML