Orchestration and DAG Design in Apache Airflow — Two Approaches

John Aven
Hashmap, an NTT DATA Company
9 min readJun 25, 2019

Orchestration of ETL processes — aka data pipelines — is a conceptually simple exercise, it’s the implementation that gets in the way.

With many tools/frameworks on the market, the build-it-here community, and so on, it is not always easy to make a decision. This is, of course, true for nearly any tech stack. From time-to-time, certain tools do come to the forefront, making a decision to use or not use them much easier — as much of the testing will have been done a priori for you and the provided guided tutorials will help you better understand how to use them.

Apache Airflow Bridges a Gap

In the process/workflow orchestration space, Apache Airflow (also packaged as Cloud Composer on Google Cloud Platform) has become a go-to framework. It bridges the gap between GUI-based and purely programmatic orchestration — pipelines are code (Python) defined and pipeline management is via a GUI written in flask.

Apache Airflow decouples the processing stages from the orchestration.

A collection of inter-dependent processes is managed primarily through a dependency DAG (Directed Acyclic Graph) — a graph where data flows in only one direction among the connected processes. Jobs that fail will not trigger jobs that were dependent on their success. Any number of arbitrarily complex dependencies can be modeled using the pythonic DAG design API.

The examples shown here are only for ‘bash’ processes. These are simple and designed so that you can test them within your own environment with little more than an installation of Apache Airflow — assuming they are working in a Linux-like environment.

Beyond the basics of configuring the DAGs, little to no attention will be given to the more detailed configurations of an Apache Airflow DAG, such as starting dates and retries, as we assume that the user has this background.

2 Approaches — Single DAG and Triggered DAGs

In presenting the possible solutions two different configurations are considered: Single DAG and Triggered DAGs. Furthermore, the approaches below will also cause the pipelines to cease if any portion along the pipeline fails.

DAG Description

The flow we are using in this example is as follows:

  • save-bash — print ‘Hello World’ the STDOUT and redirect it to a file called out.txt
  • print-file — print the output of out.txt file to STDOUT
  • copy-file — copy out.txt to out_copy.txt
  • delete-files — delete the out.txt and out_copy.txt files

As a DAG the workflow can be visualized as below. The rectangle around the diagram is meant to represent that what is inside is a self-contained DAG.

Simple DAG with workflow completely orchestrated by a single pipeline

Single DAG approach

When deploying a collection of Spark jobs which are interrelated, Apache Airflow can be used to orchestrate the dependent execution of these pipelines. These dependencies are naturally expressed as a DAG (Directed Acyclic Graph) with the Apache Airflow Python API.

First, create the DAG header — imports, configuration and initialization of the DAG. The single DAG instantiated will appear in the GUI

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
# Define the DAG…# Create the default arguments
default_args = {
‘owner’: ‘hashmap-airflow’,
‘depends_on_past’: False,
‘start_date’: datetime(2019, 5, 23),
‘retries’: 0,
‘retry_delay’: timedelta(minutes=1),
}
# create the DAG instance
dag = DAG(‘hw_bash’, default_args=default_args, schedule_interval=timedelta(1))
# These are passed in as args. Seems that they aren’t sent that way is a bug.
dag.start_date = default_args[‘start_date’]

Note: The dag.start_date shouldn’t be required. Since it is set in the default_args it should be set in the DAG classifiers constructor.

Next, define any parameters that are used in the script. Here, this is a path that is used to identify where an output file will be stored and manipulated.

# This path is used in the code below. This should identify where the code is being executed from.
path = ‘/PATH_TO_DAG/bash_dag_example’

Next, create the nodes of the DAG — these are the BashOperator instances.

# STDOUT ‘Hello World’ with redirect to out.txt
create_file= BashOperator(task_id=’save-bash’, bash_command=’echo “Hello World” > {path}/out.txt’.format(path=path))
# print the contents of out.txt to STDOUT
print_file=BashOperator( task_id=’print-file’, bash_command=’cat {path}/outs.txt’.format(path=path))
# clone/copy the data into another file
bash_cmd=’cp {path}/out.txt {path}/out_copy.txt’.format(path=path) copy_file=BashOperator(task_id=’copy-file’, bash_command=bash_cmd)
# delete the files that were created
bash_cmd=’rm -f {path}/out.txt && rm -f {path}/out_copy.txt’.format(path=path)
delete_files = BashOperator(task_id=’delete-files’, bash_command=bash_cmd)

Once the nodes have been created they need to be assigned to a DAG.

# Assign the operators to a DAG
create_file.dag = dag
print_file.dag = dag
copy_file.dag = dag
delete_files.dag = dag

And, finally, create the DAG topology by explicitly building out the edges
(dependencies) into the ‘DAG’.

# Create the DAG topology
print_file.set_upstream(task_or_task_list=[create_file])
copy_file.set_upstream(task_or_task_list=[create_file])
delete_files.set_upstream(task_or_task_list=[print_file, copy_file])

Once deployed this will be executable as a single job in Airflow, even though it
is constructed from four different jobs.

This approach is useful both when rerunning all data in the pipeline at once
is not an issue for you or when rerunning the data is required. It’s a good approach especially for simpler pipelines without many ETL stages (Spark jobs).

Triggered DAGs approach

This approach is more advanced, but not overly so. This additional complexity is necessary if:

  1. You have complicated flows you need to break down
  2. There are flows that should not reprocess computationally expensive stages
  3. It is generally desirable to decouple the execution and orchestration of a flow

For simplicity, only those parts of the design that are different from the above will be discussed here.

The first step is to realize that each of the processing steps is in a layer — where each layer can conceptually be processed concurrently. These layers can be assigned to the same DAG. Decomposing the above DAG indicates the need for three DAGs.

# create the DAG instance
dag_layer_1 = DAG(dag_id=’hw_bash_layer_1', default_args=default_args, schedule_interval=timedelta(1))
dag_layer_2 = DAG(dag_id=’hw_bash_layer_2', default_args=default_args, schedule_interval=None)dag_layer_3 = DAG(dag_id=’hw_bash_layer_3', default_args=default_args, schedule_interval=None)# Set start date# These are passed in as args. Seems that they aren’t sent that way is a bug.
dag_layer_1.start_date = default_args[‘start_date’]
dag_layer_2.start_date = default_args[‘start_date’]
dag_layer_3.start_date = default_args[‘start_date’

Note: In the downstream DAGs, the schedule_interbval is set to None — this ensures that they are not triggered by airflow automatically.

Other than the DAGs, you will also have to create TriggerDagRunOperator instances, which are used to trigger the execution of another DAG, or collection of DAGs, if necessary. These serve as the linking mechanism between layers and will trigger the execution of downstream jobs.

# Create Triggers
trigger_layer_2 = TriggerDagRunOperator(task_id=’trigger-layer2', trigger_dag_id=’hw_bash_layer_2')
trigger_layer_3 = TriggerDagRunOperator(task_id=’trigger-layer-3', trigger_dag_id=’hw_bash_layer_3')

Instead of assigning each node the same DAG, the layer-specific DAGs created
above will now be assigned to the nodes.

# Assign the operators to a DAG
create_file.dag = dag_layer_1
trigger_layer_2.dag = dag_layer_1
print_file.dag = dag_layer_2
copy_file.dag = dag_layer_2
trigger_layer_3.dag = dag_layer_2
delete_files.dag = dag_layer_3

It may be possible in some situations that a layer is thick (meaning it consists
of 2 or more layers), but here all layers are only one node thick. This means
that the only downstream components (here actually nodes) are the trigger
operators. They are set accordingly.

# Set any upstream requirements — e.g. especially for the triggers
trigger_layer_2.set_upstream(task_or_task_list=[create_file])
trigger_layer_3.set_upstream(task_or_task_list=[print_file, copy_file])

If we were to look at this topologically, we would have the situation depicted
below, which is in stark contrast to the topology addressed above. Each of the
rectangles is again a self-contained DAG and the dashed arrows indicate the
triggering event of the next DAG.

Triggered DAG example with workflow broken down into three layers in series.

In order to execute this version of the flow from within Apache Airflow, only the initial job is executed. This will trigger a cascade that terminates either when all jobs have been successful, or when one of them fails. If an intermediate layer fails, then the issue can be resolved and it can be manually/automatically re-executed without any of the requirements upstream.

An Additional Case To Be Aware Of

A third case, not discussed here, is how to start a DAG-based upon an external
event. This can be the completion of a collection of disconnected upstream DAGs.

This more advanced approach will require the use of Sensor operators and as this will require much more detail than we are afforded here — it will be covered in its own white paper.

Code Examples

Single DAG

Complete example for the single DAG example:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
# Define the DAG…# Create the default arguments
default_args = {
‘owner’: ‘hashmap-airflow’,
‘depends_on_past’: False,
‘start_date’: datetime(2019, 5, 23),
‘retries’: 0,
‘retry_delay’: timedelta(minutes=1),
}
# create the DAG instance
dag = DAG(‘hw_bash’, default_args=default_args, schedule_interval=timedelta(1))
# These are passed in as args. Seems that they aren’t sent that way is a bug.
dag.start_date = default_args[‘start_date’]
# This path is used in the code below. This should identify where the code is being executed from.
path = ‘/PATH_TO_DAG/bash_dag_example’
# STDOUT ‘Hello World’ with redirect to out.txt
create_file= BashOperator(
task_id=’save-bash’,
bash_command=’echo “Hello World” > {path}/out.txt’.format(path=path)
)
# print the contents of out.txt to STDOUT
print_file=BashOperator(task_id=’print-file’, bash_command=’cat {path}/out.txt’.format(path=path))
# clone/copy the data into another file
copy_file=BashOperator(task_id=’copy-file’, bash_command=’cp {path}/out.txt {path}/out_copy.txt’.format(path=path))
# delete the files that were created
delete_files = BashOperator(task_id=’delete-files’, bash_command=’rm -f {path}/out.txt && rm -f {path}/out_copy.txt’.format(path=path))
# Assign the operators to a DAG
create_file.dag = dag
print_file.dag = dag
copy_file.dag = dag
delete_files.dag = dag
# Create the DAG topology
print_file.set_upstream(task_or_task_list=[create_file])
copy_file.set_upstream(task_or_task_list=[create_file])
delete_files.set_upstream(task_or_task_list=[print_file,copy_file])

Triggered DAGs

Complete example in a single file for the Triggered DAGs example:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
# Define the DAG…# Create the default arguments
from airflow.operators.dagrun_operator import TriggerDagRunOperator
default_args = {
‘owner’: ‘hashmap-airflow’,
‘depends_on_past’: False,
‘start_date’: datetime(2019, 5, 23),
‘retries’: 0,
‘retry_delay’: timedelta(minutes=1),
}
# create the DAG instance
dag_layer_1 = DAG(dag_id=’hw_bash_layer_1', default_args=default_args, schedule_interval=timedelta(1))
dag_layer_2 = DAG(dag_id=’hw_bash_layer_2',
default_args=default_args, schedule_interval=None)
dag_layer_3 = DAG(dag_id=’hw_bash_layer_3', default_args=default_args, schedule_interval=None)# Set start data# These are passed in as args. Seems that they aren’t sent that way is a bug.
dag_layer_1.start_date = default_args[‘start_date’]
dag_layer_2.start_date = default_args[‘start_date’]
dag_layer_3.start_date = default_args[‘start_date’]
# This path is used in the code below. This should identify where the code is being executed from.
path = ‘/PATH_TO_DAG/bash_dag_example’
# STDOUT ‘Hello World’ with redirect to out.txt
create_file= BashOperator(task_id=’save-bash’, bash_command=’echo “Hello World” > {path}/out_tr.txt’.format(path=path))
# print the contents of out.txt to STDOUT
print_file=BashOperator(task_id=’print-file’, bash_command=’cat {path}/out_tr.txt’.format(path=path))
# clone/copy the data into another file
copy_file=BashOperator(task_id=’copy-file’, bash_command=’cp {path}/out_tr.txt {path}/out_tr_copy.txt’.format(path=path))
# delete the files that were created
delete_files = BashOperator(task_id=’delete-files’, bash_command=’rm -f {path}/out_tr.txt && rm -f {path}/out_tr_copy.txt’.format(path=path))
# Create Triggers
trigger_layer_2 = TriggerDagRunOperator(task_id=’trigger-layer2', trigger_dag_id=’hw_bash_layer_2')
trigger_layer_3 = TriggerDagRunOperator(task_id=’trigger-layer-3', trigger_dag_id=’hw_bash_layer_3')# Assign the operators to a DAG
create_file.dag = dag_layer_1
trigger_layer_2.dag = dag_layer_1
print_file.dag = dag_layer_2
copy_file.dag = dag_layer_2
trigger_layer_3.dag = dag_layer_2
delete_files.dag = dag_layer_3# Set any upstream requirements — e.g. especially for the triggers
trigger_layer_2.set_upstream(task_or_task_list=[create_file])
trigger_layer_3.set_upstream(task_or_task_list=[print_file, copy_file])

Shout Out If You Are Using Apache Airflow

I hope these 2 design approaches have helped you better understand orchestration and DAG design in Apache Airflow — it’s a vibrant, active project pushing 1,000 contributors and it would be great to hear from you, whether you are a contributor to the project or just starting out using Airflow.

Are you using Apache Airflow today? What has your experience been like so far and what you have learned? What else would you like us to cover in future posts?

Please comment below and let us hear from you!

Here some other recent stories from the team at Hashmap that we hope help you in your data and cloud initiatives…

John Aven, PhD, is the Director of Engineering at Hashmap providing Data, Cloud, IoT, and AI/ML solutions and consulting expertise across industries with a group of innovative technologists and domain experts accelerating high value business outcomes for our customers.

Be sure and connect with John on LinkedIn and reach out for more perspectives and insight into accelerating your data-driven business outcomes.

--

--

John Aven
Hashmap, an NTT DATA Company

“I’d like to join your posse, boys, but first I’m gonna sing a little song.”