Airflow: Building DAGs

Victor Caceres Chian
Machine Learning Reply DACH
8 min readSep 22, 2022

In the previous article we introduced the concept of orchestration, its importance, and how Airflow can be the right tool to assist us in running our processes.

You can read the previous article here.

Now let’s take a closer look into Airflow and how it handles workflows.

Airflow utilizes Directed Acyclic Graphs (DAGs) as its workflow representation. Each node in the graph represents a task. The tasks are isolated from one another and can be data transformation, an API query, or a trigger to another workflow. The order of execution and dependencies are imprinted in the structure of the graph.

Airflow does not restrict what these tasks do. So they can be anything we imagine! As long as tasks are python or shell scripts, Airflow will happily run them. This gives us limitless application use cases!

Figure 1. Example of an Airflow Workflow

Here we can see how a DAG elegantly describes some tasks and their dependencies in a workflow. We can clearly understand the order in which tasks are executed, going from left to right indicated by arrow direction, previous tasks must be successful for the next ones to follow. The Graph’s architecture also shows which tasks need to be executed sequentially and which can be executed in parallel. Colors around the tasks show how a pipeline progresses.

We will dive deep into this workflow later in this article. To understand it better, we should explore Airflow’s architecture and how it is able to manage the execution and monitoring of workflows.

Figure 2. Airflow Architecture (Adapted from Airflow blog)

Airflow has usually the following components:

  • Webserver: Provides the User Interface for easy viewing and interaction with workflows.
  • DAG Folder: Contains the Code for the Workflows and is read by the Scheduler and Executor.
  • Scheduler: Reads the DAG folders, triggers scheduler workflows and indicates to the Executor which tasks should run.
  • Executor: Handles the tasks. In default Airflow, the executor also runs the tasks. However, in executors used for production, the tasks are pushed to workers. In our architecture example we will use Celery Flower as the Executor and Celery Workers, which are going to run the tasks.
  • Metadata Database: Used by the other components to store DAG state data.

With this architecture, Airflow collects data about the executed tasks. We can use the User Interface to find information about the progress of tasks as well as data and statistics about previous runs of workflows.

Let’s get an overview of the Airflow User Interface:

We are first met with a screen like this one. Here we have an overview of the existing DAGs in our DAG repository. We have an active/paused DAG indicator that we can toggle and some information about the recent runs of the task.

Figure 3. Airflow User Interface

Clicking in a DAG name will send us to the Grid view of the task. It depicts the recent runs, as well as some basic insights of the workflow.

Figure 4. Airflow DAG Grid View

Hint: If you are curious what is the “luck” task and why it failed…It is a script that randomly picks a number and if it is even, the task fails. But why only 1 of 9 fail if has 50% chance? Well, Airflow has a retry option too! We will see this later as well, but with one retry, two tasks that had failed previously ran successfully and the workflow continues.

Other tabs provide a different view of the information. A particularly interesting one is the “Task Duration Tab”, which provides a view of the average duration of tasks. It can be utilized to identify anomalies and bottlenecks. Another is “Landing Time”, which provides the real time a workflow took to finish, measuring from the scheduled time to the finishing time.

Figure 5. Task Duration View

With this interface, we can also interact with tasks! If a task is faulty or requires reprocessing, we can simply click on a task in Grid or Graph view and a display menu will appear. Here we can find the logs which contain the details of the script that a task uses. Any logging or prints made by our scripts are displayed in the logs and this will make it easier to debug a program.

We can also utilize other options to manually dictate the next steps of the workflow.

Figure 6. Task Options

If we attempt to manually change the state of a task, Airflow will let us know what we are changing before anything happens!

Figure 7. Confirmation Prompt

After clearing the tasks, Airflow will rerun them. Looks like we did have some luck and the pipeline continued successfully!

Figure 8. Airflow Successful Execution

Now, let’s see how we can write these DAGs.

We will be using the apache-airflow library code, so launch your favorite IDE and install apache-airflow with the help of pip. Since we won’t run the scripts directly, this step is entirely optional. It will only assist us in identifying semantic mistakes and give us some suggestions. Here is the link: https://airflow.apache.org/docs/apache-airflow/stable/installation/installing-from-pypi.html.

Let’s create a basic DAG.

There are many more options in a DAG that can be found here: https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html.

Let’s now explore some basic settings.

We first created a JSON object with a couple of defining attributes.

  • Start_date: specifies when this DAG starts its processing. If we set it in the past, it will begin from the current point in time.
  • Retries: How many times should a task try again if it fails the first time. Perhaps it did not have enough memory, then it might have better luck on the second time.
  • Retry_delay: How much time should it wait until it is executed again.

We utilize this JSON object in the DAG function. Here we specify some parameters:

  • Description: Name of the DAG
  • Schedule_interval: This is a CRON format. It specifies the scheduling of the DAG. In our example it is every 4 hours. We can find a converter in internet like: https://crontab.guru/.
  • Tags: Any additional tags to the DAG.

Inside the function we can start defining the tasks.

Tasks are defined by using Airflow Operators, here are some of them that we utilize in our example:

  • BashOperator: We can use this to execute bash commands directly on the container.
  • PythonOperator: Our main Operator, it calls python functions that can be defined in the same DAG script or in another script.
  • TriggerDagRunOperator: What if we need to trigger a DAG based on a task? We can use this Operator for that purpose, where the trigger_dag_id is the id of another DAG.
  • Perhaps we need a place holder, or a consolidation of tasks? We can use the Dummy Operator for this.

To execute tasks in parallel we need an additional step. We can either use a TaskGroup to create a visible group or use a simple array.

Now that we have all tasks defined, let’s define a sample sequence. Airflow utilizes an Airflow-specific notation to define DAG sequences. We can define dependencies utilizing a “>>” notation which gives an easy left to right way of reading dependencies.

So, after having an overview of the Airflow coding features, what is the actual code that defines the example pipeline?

As we said before, tasks execute isolated from each other. Sometimes, they will work on the same items. In most cases, we will normally interact with a 3rd party to read a file, query an API, or write a document. Nevertheless, what if we only need a variable or simple input from a previous task? We can also leverage Airflow to pass information between them!

We will user Airflow’s XComs (short for “cross-communications”) that let’s our tasks communicate with each other.

When writing a Python script, we can “push” data to XCom in two ways.

Simply by using the return function like this:

or by explicitly pushing a variable with a key (note, we need to define the **kwargs argument in the function):

We can retrieve these variables in later tasks.

The first one by using the task id.

and the second one by using the key and task id.

The communication using XCom is recommended to only be done in specific cases where variables are simple, and there are no sensitive data. For other cases, we can use storages such as S3, where we can connect utilizing the boto3 library.

Deploying Airflow

Now that we have seen the User Interface and how to build a DAG, we need to think about the infrastructure that will host all the Airflow components. We will start with a local deployment to freely test our DAGs and to have the freedom to explore more Airflow capabilities.

We will use Docker to simulate the containers needed. Refer to https://docs.docker.com/engine/install/ to install docker locally. Note: Windows Users should utilize the WSL/Ubuntu Subsystem and use the Docker Integration available.

We can then follow this guide to deploy our airflow locally: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html.

We will get the docker-compose file with the following command:

curl -LfO ‘https://airflow.apache.org/docs/apache-airflow/2.3.4/docker-compose.yaml'

This file specifies all images required and has their specific launch sequences.

We now need to create the logs, plugins, and DAGs folder. In this folder, we can place our DAGs that will be read by the scheduler and shown in the user interface. The “docker-compose up” command, will download the required images and launch the respective containers.

Figure 9. Local Airflow Folder Structure

We can access the User Interface via localhost:8080 afterwards, where the airflow login screen will pop up. The default username/password is airflow/airflow.

Figure 10. Airflow Welcome Screen

If the DAGs are correctly written, they will display in the home screen in addition to the example’s DAGs (if not disabled) that come with Airflow. Then we are free to interact with the DAGs however we like and explore the Airflow’s capabilities ourselves!

Now, what if we want to disable the example DAGs? Airflow utilizes a configuration file, where we can configure this parameter. We will explore some further features of the configuration file in our next deployment example! For now, in this docker example, we pass the configuration parameters as environmental variables in the environment part of the file.

AIRFLOW__CORE__LOAD_EXAMPLES: 'false'

After much testing and exploring, if we ever want a hard reset on our local Airflow instances, we can empty the logs folder and run the command that will wipe out the metadata database.

docker-compose down --volumes --rmi all

In the next article we will explore another Airflow deployment utilizing AWS Elastic Container Service: A much more robust deployment that will be more suitable for a production environment with some additional adjustments.

At Machine Learning Reply, we guide and support all our customers in the development of their IT capabilities towards Machine Learning, data or cloud Use Cases, regardless of their current phase.

--

--