Primal Data Advent Calendar #1: How to skip Airflow tasks conditionally

Andrej Švec
Slido developers blog
3 min readDec 2, 2020

This piece is a part of the Primal Data Advent Calendar, a series of miscellaneous articles written by the Slido Data Team. We release one on each Advent day that happens to be a prime (like 2 for the 2nd of December). Enjoy!

Apache Airflow is an open-source platform for scheduling and running workflows. At Slido, we use it for orchestrating processing pipelines, such as extracting our data from various sources to the data warehouse or combining different data sources to create new views of the data.

The individual workflows in Airflow are called Directed Acyclic Graphs, or DAGs for short. Each DAG can contain numerous tasks that depend on one another. A task is the atomic unit of the workflow and is created by instantiating a BaseOperator subclass.

An example task. BashOperator tasks can run any bash command, this one prints the current date.

The tasks are usually run based on the order specified by the dependencies between them. However, there are cases when you want to skip a task based on a condition that can only be evaluated at runtime, such as skipping the task execution on holidays. This is the situation we address in this post.

Why a simple decorator does not work?

The main logic of any Airflow task is taking place inside the BaseOperator.execute method. If we want to skip the task, this is where our changes need to happen.

Normally, if you wanted to change an instance method of a Python object, you’d use a decorator. In the case of ourBashOperator from above, the decorator could look as follows:

A decorator that only runs the task if some_condition is fulfilled

When you run the operator, everything seems fine, the task is skipped if some_condition is not fulfilled or prints the current date:

Mon 30 Nov 2020 03:09:00 PM UTC

template_fields

The problem comes if you use template_fields. This list contains attributes that can be templated using Jinja. If we change our example task so that it prints the current execution date by using the macro {{ ds }}, Airflow fails to evaluate the template and only prints the raw string {{ ds }}.

An example BashOperator task printing the current execution date.

The template fields don’t work because Airflow internally copies the task object before resolving the templates and calling the execute method. However, the execute method passed to the run_conditionally decorator is bounded to the old object (operates on the old object) and thus does not see the resolved values which are stored in the new object. You can read more about the issue on StackOverflow.

How to keep the template_fields working?

To keep the template_fields working, we need to subclass the operator class prior to instantiating it and override the execute method:

A decorator subclassing the operator_cls and overriding the execute method.

The code above finally works as we would want it to. It evaluates the macro {{ ds }} and prints the current execution date:

2020-11-30

Final words

By creating a decorator that subclasses the operator class, we can easily skip Airflow tasks of any type based on conditions evaluated at task runtime. In the real world, some_condition can represent checking the state of another task (if your task dependency structure is too complex to be modelled by standard Airflow tools), checking for the day of the week (if you want to run some tasks more frequently than other but still keep them in the same DAG) or checking the values of DAG Run configuration provided when triggering the DAG.

--

--