Parallel and sequential tasks topology in the Airflow Task Flow Paradigm

Pavel Pustoshnyi
The Deep Hub
Published in
5 min readAug 31, 2024
This reminds me of something… | Photo by Tim Mossholder on Unsplash

In this article, I’ll show you how to write as little code in Airflow DAGs as possible for arbitrarily complicated topologies in your DAGs using task and task group decorators representing the Airflow Task Flow Paradigm. There is more: In Airflow web UI, the DAGs will look clear, and the inner sequences will be easy to read.

Simple topologies

There are two basic cases.

Tasks can run consequentially: the second task will run only after the first. When there are not many tasks, we can specify their order using “>>.” DAG’s code for this case looks like this:

import os

from airflow.decorators import dag, task

dag_id = os.path.basename(__file__).split('.')[0]


@dag(
dag_id=dag_id,
schedule=None,
)
def task_flow():
"""
This dag contains two dependent tasks: the second task
will run only after the first task
"""

@task(task_id='dependent_first_task')
def first_task():
print('First task')

@task(task_id='dependent_second_task')
def second_task():
print('Second task')

first_task() >> second_task()


task_flow()
Web UI: two sequential tasks

Tasks can run in parallel, not necessarily concurrently, but independently. When there are a few tasks, we can simply not specify their order; we just call their functions. DAG’s code for this case looks like this:

import os

from airflow.decorators import dag, task

dag_id = os.path.basename(__file__).split('.')[0]


@dag(
dag_id=dag_id,
schedule=None,
)
def task_flow():
"""
This dag contains two independent tasks.
"""

@task(task_id='independent_task_a')
def run_task_a():
print('Task A')

@task(task_id='independent_task_b')
def run_task_b():
print('Task B')

run_task_a()
run_task_b()


task_flow()
Web UI: two parallel or independent tasks

Using task groups, you can easily combine these simple topologies. In the example below, there is DAG, which contains two dependent groups: the second group with independent tasks inside will run only after the first
group with dependent tasks inside. In this example, we again use “>>” for dependency. If we are not, these groups will run independently, like independent tasks.

import os

from airflow.decorators import dag, task, task_group


@dag(dag_id=os.path.basename(__file__).split('.')[0], schedule=None)
def task_flow():
"""
This dag contains two dependent groups: the second group with
independent tasks inside will run only after the first group
with dependent tasks inside
"""

@task_group(group_id='dependent_tasks')
def run_dependent_tasks():
@task(task_id='first_task')
def first_task():
print('First task')

@task(task_id='second_task')
def second_task():
print('Second task')

first_task() >> second_task()

@task_group(group_id='independent_tasks')
def run_independent_tasks():
@task(task_id='task_a')
def run_task_a():
print('Task A')

@task(task_id='task_b')
def run_task_b():
print('Task B')

run_task_a()
run_task_b()

run_dependent_tasks() >> run_independent_tasks()


task_flow()
Web UI: two sequential task groups

DAGs with many tasks

What if we should repeat the same action many times? When there are many tasks, the code in DAG becomes a mess. We can simplify our life by using loops. In the Task Flow Paradigm for dependent sequences, the code looks like this:

import os

from airflow.decorators import dag, task

dag_id = os.path.basename(__file__).split('.')[0]

steps = list(range(10))


@dag(
dag_id=dag_id,
schedule=None,
)
def task_flow():
"""
This dag contains N=10 dependent tasks.
"""
tasks = list()
for step in steps:
@task(task_id=f'task_{step}')
def run_task():
print(f'Task {step}')

tasks.append(run_task())

for task_i in range(len(tasks) - 1):
tasks[task_i] >> tasks[task_i + 1]


task_flow()
Web UI: ten sequential tasks

We don’t have to write ten times decorator and ten dependencies with “>>” — only once.

The same approach we should use for independent tasks, but without “>>”:

import os

from airflow.decorators import dag, task

dag_id = os.path.basename(__file__).split('.')[0]

entities = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']


@dag(
dag_id=dag_id,
schedule=None,
)
def task_flow():
"""
This dag contains N=10 independent tasks.
"""
for entity in entities:
@task(task_id=f'task_{entity}')
def run_task():
print(f'Task {entity.capitalize()}')

run_task()


task_flow()
Web UI: ten parallel tasks

Again, we can combine these topologies using task groups:

import os

from airflow.decorators import dag, task, task_group

entities = ['a', 'b', 'c', 'd', 'e']
steps = list(range(3))


@dag(dag_id=os.path.basename(__file__).split('.')[0], schedule=None)
def task_flow():
"""
This dag contains two dependent groups: the second group with N=3
dependent tasks inside will run only after the first group
with M=5 independent tasks inside
"""

@task_group(group_id='independent_tasks')
def run_independent_tasks():
for entity in entities:
@task(task_id=f'task_{entity}')
def run_task():
print(f'Task {entity.capitalize()}')

run_task()

@task_group(group_id='dependent_tasks')
def run_dependent_tasks():
tasks = list()
for step in steps:
@task(task_id=f'task_{step}')
def run_task():
print(f'Task {step}')

tasks.append(run_task())

for task_i in range(len(tasks) - 1):
tasks[task_i] >> tasks[task_i + 1]

run_independent_tasks() >> run_dependent_tasks()


task_flow()
Web UI: two sequential task groups

Again, you can make these groups independent by removing “>>.”

Arbitrary topology

Using the approaches below, combining task groups, you can easily write DAGs with arbitrary topologies, for example:

import os

from airflow.decorators import dag, task, task_group

groups = list(range(2))
steps = list(range(3))
entities = ['a', 'b']


@dag(dag_id=os.path.basename(__file__).split('.')[0], schedule=None)
def task_flow():
"""
This dag contains two dependent groups. The first group contains
three subgroups with two subgroups inside. After the big group, there
is one small group with only one task
"""

@task_group(group_id='group_of_groups')
def run_group_of_groups():
for group in groups:
@task_group(group_id=f'group_{group}')
def run_sub_group():
@task_group(group_id='dependent_tasks')
def run_dependent_tasks():
tasks = list()
for step in steps:
@task(task_id=f'task_{step}')
def run_task():
print(f'Task {step}')

tasks.append(run_task())

for task_i in range(len(tasks) - 1):
tasks[task_i] >> tasks[task_i + 1]

@task_group(group_id='independent_tasks')
def run_independent_tasks():
for entity in entities:
@task(task_id=f'task_{entity}')
def run_task():
print(f'Task {entity.capitalize()}')

run_task()

run_dependent_tasks() >> run_independent_tasks()

run_sub_group()

@task_group(group_id='small_group')
def run_small_group():
@task(task_id='one_task')
def run_one_task():
print('Only one task')

run_one_task()

run_group_of_groups() >> run_small_group()


task_flow()
Web UI: DAG with arbitrary topology

The Airflow Task Flow Paradigm gives us powerful abilities: We can write simple, clear code and see DAGs' topology clearly in Web UI, so we should use it.

--

--