Understanding Apache Airflow’s key concepts

Part Three of a Four-part Series

In Part I and Part II of Quizlet’s Hunt for the Best Workflow Management System Around, we motivated the need for workflow management systems (WMS) in modern business practices, and provided a wish list of features and functions that led us to choose Apache Airflow as our WMS of choice. This post aims to give the curious reader a detailed overview of Airflow’s components and operation. We’ll cover Airflow’s key concepts by implementing the example workflow introduced in Part I of the series (see Figure 3.1).

Figure 3.1: An example data processing workflow.

Airflow is a WMS that defines tasks and and their dependencies as code, executes those tasks on a regular schedule, and distributes task execution across worker processes. Airflow offers an excellent UI that displays the states of currently active and past tasks, shows diagnostic information about task execution, and allows the user to manually manage the execution and state of tasks.

Workflows are “DAGs”

Workflows in Airflow are collections of tasks that have directional dependencies. Specifically, Airflow uses directed acyclic graphs — or DAG for short — to represent a workflow. Each node in the graph is a task, and edges define dependencies amongst tasks (The graph is enforced to be acyclic so that there are no circular dependencies that can cause infinite execution loops).

The top of Figure 3.2 demonstrates how our example workflow is represented in Airflow as a DAG. Notice the similarity in the structure of the execution plan for our example workflow tasks in Figure 1.1 and the structure of the DAG in Figure 3.2.

Figure 3.2 Screenshots from the Airflow UI, Representing the example workflow DAG. Top Subpanel: The Graph View of the DagRun for Jan. 25th. Dark green nodes indicate TaskInstances with “success” states. The light green node depicts a TaskInstance in the “running” state. Bottom Subpanel: The Tree View of the example_workflow DAG. The main components of Airflow are highlighted in screen shot, including Sensors, Operators, Tasks, DagRuns, and TaskInstances. DagRuns are represented as columns in the graph view — the DagRun for Jan. 25th is outlined in cyan. Each square in the graph view represents a TaskInstance — the TaskInstance for the (“running”) perform_currency_conversion task on Jan. 25th is outlined in blue.

At a high level, a DAG can be thought of as a container that holds tasks and their dependencies, and sets the context for when and how those tasks should be executed. Each DAG has a set of properties, most important of which are its dag_id, a unique identifier amongst all DAGs, its start_date, the point in time at which the DAG’s tasks are to begin executing, and the schedule_interval, or how often the tasks are to be executed. In addition to the dag_id, start_date, and schedule_interval, each DAG can be initialized with a set of default_arguments. These default arguments are inherited by all tasks in the DAG.

In the code block below, we define a DAG that implements our gaming company example workflow in Airflow.

Operators, Sensors, and Tasks

Although the DAG is used to organize tasks and set their execution context, DAGs do not perform any actual computation. Instead, tasks are the element of Airflow that actually “do the work” we want performed. Tasks can have two flavors: they can either execute some explicit operation, in which case they are an Operator, or they can pause the execution of dependent tasks until some criterion has been met, in which case they are a Sensor. In principle, Operators can perform any function that can be executed in Python. Similarly, Sensors can check the state of any process or data structure.

The code block below shows how we would define some (hypothetical) Operator and Sensor classes to implement our example workflow.

The code defines a subclass of BaseSensorOperator, the ConversionRatesSensor. This class implements a poke method, which is required for all BaseSensorOperator objects. The poke method must return True if the downstream tasks are to continue and False otherwise. In our example, this sensor would be used to determine when exchange rates from the external API have become available.

The two classes, ExtractAppStoreRevenueOperator and TransformAppStoreJSONDataOperator are inherited from Airflow’s BaseOperator class and implement an execute method. In our example, these two classes’ execute methods pull data from the app store APIs, and transform them into the company’s preferred storage format. Notice that the ExtractAppStoreRevenueOperator also takes a custom parameter, app_store_name,which tells the class the app store from which to request data.

Note that generally Operators and Sensors are defined in separate files and imported into the same namespace we define the DAG. However, we could have added these class definitions to the same DAG-definition file as well.

Formally, Airflow defines a task as an instantiations of either the Sensor or Operator classes. Instantiating a task requires providing a unique task_id and DAG container in which to add the task (Note: in versions ≥ 1.8, there is no longer a DAG object requirement). The code block below shows how we would instantiate all the tasks needed to perform our example workflow. (Note: We assume that all Operators that are being referenced in our examples have been defined in or imported into our namespace).

This task instantiation code is executed in the same file/namespace as the DAG definition. We can see that the code for adding tasks is concise and allows for in-line documentation via comments. Lines 10–19 demonstrate one of the strengths of defining workflows in code. We are able to dynamically define three separate tasks for extracting data from each of the app stores using a for loop. Though this approach may not buy us much in this small example, the benefits are huge as the number of app stores increases.

Defining Task Dependencies

A key strength of Airflow is the concise and intuitive conventions for defining dependencies among tasks. The code below shows how we would define the task dependency graph for our example workflow:

Again, this code is run in the same file/namespace as the DAG definition. Task dependencies are set using the set_upstream and set_downstream operators (Though, in version ≥ 1.8, it’s also possible to use the bitshift operators << and >> to perform the same operations more concisely). A task can have multiple dependencies (e.g. combine_revenue_data), or none at all (e.g. all extract_* tasks).

The Top Subpanel of Figure 3.2 shows the Airflow DAG created by the above code, as rendered by Airflow’s UI (we’ll soon get to the UI in more detail). The DAG has a dependency structure that is very similar the execution plan we came up with for our example workflow shown in Figure 1.1. When the DAG is being executed, Airflow will also use this dependency structure to automagically figure out which tasks can be run simultaneously at any point in time (e.g. all the extract_* tasks).

DagRuns and TaskInstances

Once we’ve defined a DAG — i.e. we’ve instantiated tasks and defined their dependencies — we can then execute the tasks based on the parameters of the DAG. A key concept in Airflow is that of an execution_time. When the Airflow scheduler is running, it will define a regularly-spaced schedule of dates for which to execute a DAG’s associated tasks. The execution times begin at the DAG’s start_date and repeat every schedule_interval. For our example the scheduled execution times would be (‘2017–01–01 00:00:00’, ‘2017–01–02 00:00:00’, ...). For each execution_time, a DagRun is created and operates under the context of that execution time. Thus a DagRun is simply a DAG with some execution time (see the Bottom Subpanel of Figure 3.2).

All the tasks associated with a DagRun are referred to as TaskInstances. In other words a TaskInstance is a task that has been instantiated and has an execution_date context (see Bottom Subpanel of Figure 3.2). DagRuns and TaskInstances are central concepts in Airflow. Each DagRun and TaskInstance is associated with an entry in Airflow’s metadata database that logs their state (e.g. “queued”, “running”, “failed”, “skipped”, “up for retry”). Reading and updating these states is key for Airflow’s scheduling and execution processes.

Airflow’s Architecture

At its core, Airflow is simply a queuing system built on top of a metadata database. The database stores the state of queued tasks and a scheduler uses these states to prioritize how other tasks are added to the queue. This functionality is orchestrated by four primary components (refer to the Left Subpanel of Figure 3.2):

  1. Metadata Database: this database stores information regarding the state of tasks. Database updates are performed using an abstraction layer implemented in SQLAlchemy. This abstraction layer cleanly separates the function of the remaining components of Airflow from the database.
    2. Scheduler: The Scheduler is a process that uses DAG definitions in conjunction with the state of tasks in the metadata database to decide which tasks need to be executed, as well as their execution priority. The Scheduler is generally run as a service. 
    3. Executor: The Executor is a message queuing process that is tightly bound to the Scheduler and determines the worker processes that actually execute each scheduled task. There are different types of Executors, each of which uses a specific class of worker processes to execute tasks. For example, the LocalExecutor executes tasks with parallel processes that run on the same machine as the Scheduler process. Other Executors, like the CeleryExecutor execute tasks using worker processes that exist on a separate cluster of worker machines. 
    4. Workers: These are the processes that actually execute the logic of tasks, and are determined by the Executor being used.
Figure 3.2: Airflow’s General Architecture. Airflow’s operation is built atop a Metadata Database which stores the state of tasks and workflows (i.e. DAGs). The Scheduler and Executor send tasks to a queue for Worker processes to perform. The Webserver runs (often-times running on the same machine as the Scheduler) and communicates with the database to render task state and Task Execution Logs in the Web UI. Each colored box indicates that each component can exist in isolation from the other components, depending on the type of deployment configuration.

Scheduler Operation

At first, the operation of Airflow’s scheduler can seem more like black magic than a logical computer program. That said, understanding the workings of the scheduler can save you a ton of time if you ever find yourself debugging its execution. To save the reader from having to dig through Airflow’s source code (though we DO highly recommend it!), we outline the basic operation of the scheduler in pseudo-code:

Web UI

In addition to the primary scheduling and execution components, Airflow also includes components that support a full-featured Web UI (refer to Figure 3.2 for some UI examples), including:

1. Webserver: This process runs a simple Flask application which reads the state of all tasks from the metadata database and renders these states for the Web UI.
2. Web UI: This component allows a client-side user to view and edit the state of tasks in the metadata database. Because of the coupling between the Scheduler and the database, the Web UI allows users to manipulate the behavior of the scheduler.
3. Execution Logs: These logs are written by the worker processes and stored either on disk or a remote file store (e.g. GCS or S3). The Webserver accesses the logs and makes them available to the Web UI.

Though these additional components are not necessary to the basic operation of Airflow, they offer functionally that really sets Airflow apart from other current workflow managers. Specifically the UI and integrated execution logs allows users to inspect and diagnose task execution, as well as view and manipulate task state.

Command Line Interface

In addition to the Scheduler and Web UI, Airflow offers robust functionality through a command line interface (CLI). In particular, we found the following commands to be helpful when developing Airflow:

  • airflow test DAG_ID TASK_ID EXECUTION_DATE. Allows the user to run a task in isolation, without affecting the metadata database, or being concerned about task dependencies. This command is great for testing basic behavior of custom Operator classes in isolation.
  • airflow backfill DAG_ID TASK_ID -s START_DATE -e END_DATE. Performs backfills of historical data between START_DATE and END_DATE without the need to run the scheduler. This is great when you need to change some business logic of a currently-existing workflow and need to update historical data. (Note that backfills do not create DagRun entries in the database, as they are not run by the SchedulerJob class).
  • airflow clear DAG_ID. Removes TaskInstance records in the metadata database for the DAG_ID. This can be useful when you’re iterating on the functionality of a workflow/DAG.
  • airflow resetdb: though you generally do not want to run this command often, it can be very helpful if you ever need to create a “clean slate,” a situation that may arise when setting up Airflow initially (Note: this command only affects the database, and does not remove logs).

Above, we provided an outline of some of the more abstract concepts underlying Airflow’s operation. In the Final installment of our series, we’ll touch on some of the more practical considerations when deploying Airflow in production.