Use conditional tasks with Apache Airflow

Guillaume Payen
6 min readJul 27, 2018

--

One of the great things about Apache Airflow is that it allows to create simple and also very complex pipelines, with a design and a scripting language that remain very accessible. Therefore, it becomes very easy to build mind blowing workflows that could match many many use cases.

If this is pretty straight forward to develop static pipelines, you will see that it is a bit trickier to add dynamics to your pipelines, and connect tasks together to make them share the information that will induce their execution.

Why would you need a dynamic pipeline ?

Static pipelines are practical, but the drawback with this approach is that the execution of the several tasks is linear. You would typically encode the tasks, and link them together. Tasks would then be executed the one after the other. The question is, how will you handle the situation where the execution of some tasks is conditioned by the execution result of others ?

Dynamic workflow will allow you to handle some particular events such as logging and workarounds for instance. Imagine that you want to launch a specific task depending on the day of the week. On monday, you want to load the data. On tuesday, you want to process it. You wish to proceed as follow, until sunday where the reports will be created. In this situation, the execution of the workflow cannot be linear as it depends on external parameters (the week day).

Such a need will look quite trivial. Thing is that implementation is not. I was hoping to find some documentation that could tell me about tasks conditioning, but after a while I had to admit that Airflow’s API doesn’t provide with such a functionality. No obvious solution then. Yet, Airflow’s API proposes use of other components that — when mixes together — will do the job just fine!

The purpose of this blog is to give you the quick and simple implementation of a dynamic pipeline, where the execution of some tasks would be conditioned by the result of another.

Implementation

To keep things simple, let’s stick to my previous — very basic — example where I wanted to launch a task depending on the week day.

from airflow import DAG
from airflow.operators import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'guillaume',
'depends_on_past': False,
'start_date': datetime(2018, 6, 18),
'email': ['hello@moonshots.ai'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
'Weekday',
default_args=default_args,
schedule_interval="@once")
# used to fatorize the code and avoid repetition
tabDays = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"]

Nothing outstanding here, we are just setting up our Airflow pipeline, and we are creating an array with 7 items; our 7 week days.

# returns the week day (monday, tuesday, etc.)
def get_day(**kwargs):
kwargs['ti'].xcom_push(key='day', value=datetime.now().weekday())
# PythonOperator will retrieve and store into "weekday" variable the week day
get_weekday = PythonOperator(
task_id='weekday',
python_callable=get_day,
provide_context=True,
dag=dag
)

The purpose of this first task will be pretty simple: get the week day of our system, and store it into a variable. A python function (get_day) is used by PythonOperator. What is important to read here is that we are storing and sharing the week day. This is what we call Cross Communication, also known as XCOM. kwargs is a common term used in many scripts. It stands for keyword argument. ti stands for task instance. The get_day function then stores in the task instance index of the keyword argument a key value pair, where the key is day, and its value is the week day computed with datetime().

We are dealing with good practices here. Your script will also work if your variable is args instead of kwargs. Same with ti index.

There is another thing to say about PythonOperator. In order XCOM to work, the provide_context must be set to True. Here is what the documentation says about provide_context.

if set to true, Airflow will pass a set of keyword arguments that can be used in your function. This set of kwargs correspond exactly to what you can use in your jinja templates. For this to work, you need to define `**kwargs` in your function header.

# returns the name id of the task to launch (task_for_monday, task_for_tuesday, etc.)
def branch(**kwargs):
return 'task_for_' + tabDays[kwargs['ti'].xcom_pull(task_ids='weekday', key='day')]
# BranchPythonOperator will use "weekday" variable, and decide which task to launch next
fork = BranchPythonOperator(
task_id='branching',
python_callable=branch,
provide_context=True,
dag=dag)

After we have stored our week day, we can use another very useful Airflow component called BranchPythonOperator. What it does is pretty straight forward. the return value of the python_callable function will be the id of a task, and our BranchPythonOperator task will then trigger it (if possible by the way tasks are branched together).

In our case, the branching task will call the branch function. you will notice that we also use the provide_context parameter set to True, in order to fetch the data stored in the previous task execution. The branch python function will return a task id in the form of task_for_monday, task_for_tuesday, etc. What it does is that it uses the task instance index of the keywords argument, and gets the value of the day key that was set in the weekday task.

We are almost done, we just need to create our final DummyTasks for each day of the week, and branch everything.

# task 1, get the week day, and then use branch task
get_weekday.set_downstream(fork)
# One dummy operator for each week day, all branched to the fork
for day in range(0, 6):
fork.set_downstream(DummyOperator(task_id='task_for_' + tabDays[day], dag=dag))

Here is what our pipeline looks like.

Discussion

The purpose of this example was to show you how it is possible to do tasks conditioning with XCOM and PythonBranchOperator. By mixing those 2 components, we are able to store some data (that stands for the result of a task execution), and use it to guide the execution of the following tasks.

Indeed, I could have factorized the script. Some of you may say that scripting the weekday task is useless as I could have computed the week day of the system within the branch function. This is true but for the sake of this tutorial, I prefered splitting the steps and stay general so that you understand how branching and XCOM can work together.

Let’s rock

Now is the time to see this working !

So far so good. Tasked with a green border are those that were executed, those with a pink border were skipped. As you can see, I am testing this blog, on a shiny thursday :)

Now is your turn. This tutorial was meant to be very simple so, that you understand the concepts of XCOM and branching. It is up to you know to adapt the code to your own needs. If this blog helped you in any ways, feel free to leave a comment :)

Full code of the tutorial

from airflow import DAG
from airflow.operators import PythonOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
# used to fatorize the code and avoid repetition
tabDays = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"]
default_args = {
'owner': 'guillaume',
'depends_on_past': False,
'start_date': datetime(2018, 6, 18),
'email': ['hello@moonshots.ai'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
'Weekday',
default_args=default_args,
schedule_interval="@once")
# returns the week day (monday, tuesday, etc.)
def get_day(**kwargs):
kwargs['ti'].xcom_push(key='day', value=datetime.now().weekday())
# returns the name id of the task to launch (task_for_monday, task_for_tuesday, etc.)
def branch(**kwargs):
return 'task_for_' + tabDays[kwargs['ti'].xcom_pull(task_ids='weekday', key='day')]
# PythonOperator will retrieve and store into "weekday" variable the week day
get_weekday = PythonOperator(
task_id='weekday',
python_callable=get_day,
provide_context=True,
dag=dag
)
# BranchPythonOperator will use "weekday" variable, and decide which task to launch next
fork = BranchPythonOperator(
task_id='branching',
python_callable=branch,
provide_context=True,
dag=dag)
# task 1, get the week day
get_weekday.set_downstream(fork)
# One dummy operator for each week day, all branched to the fork
for day in range(0, 6):
fork.set_downstream(DummyOperator(task_id='task_for_' + tabDays[day], dag=dag))

--

--

Guillaume Payen

I live in Paris, France, and I am a Data/Cloud architect and machine learning engineer. I’m glad I can share my projects and experiments with you on Medium :)