Understanding Apache Airflow’s key concepts
Part Three of a Four-part Series
In Part I and Part II of Quizlet’s Hunt for the Best Workflow Management System Around, we motivated the need for workflow management systems (WMS) in modern business practices, and provided a wish list of features and functions that led us to choose Apache Airflow as our WMS of choice. This post aims to give the curious reader a detailed overview of Airflow’s components and operation. We’ll cover Airflow’s key concepts by implementing the example workflow introduced in Part I of the series (see Figure 3.1).
Airflow is a WMS that defines tasks and and their dependencies as code, executes those tasks on a regular schedule, and distributes task execution across worker processes. Airflow offers an excellent UI that displays the states of currently active and past tasks, shows diagnostic information about task execution, and allows the user to manually manage the execution and state of tasks.
Workflows are “DAGs”
Workflows in Airflow are collections of tasks that have directional dependencies. Specifically, Airflow uses directed acyclic graphs — or DAG for short — to represent a workflow. Each node in the graph is a task, and edges define dependencies amongst tasks (The graph is enforced to be acyclic so that there are no circular dependencies that can cause infinite execution loops).
The top of Figure 3.2 demonstrates how our example workflow is represented in Airflow as a DAG. Notice the similarity in the structure of the execution plan for our example workflow tasks in Figure 1.1 and the structure of the DAG in Figure 3.2.
DagRun
for Jan. 25th. Dark green nodes indicate TaskInstance
s with “success” states. The light green node depicts a TaskInstance
in the “running” state. Bottom Subpanel: The Tree View of the example_workflow
DAG. The main components of Airflow are highlighted in screen shot, including Sensors, Operators, Tasks, DagRuns
, and TaskInstances
. DagRuns
are represented as columns in the graph view — the DagRun
for Jan. 25th is outlined in cyan. Each square in the graph view represents a TaskInstance
— the TaskInstance
for the (“running”) perform_currency_conversion
task on Jan. 25th is outlined in blue.At a high level, a DAG can be thought of as a container that holds tasks and their dependencies, and sets the context for when and how those tasks should be executed. Each DAG has a set of properties, most important of which are its dag_id
, a unique identifier amongst all DAGs, its start_date
, the point in time at which the DAG’s tasks are to begin executing, and the schedule_interval
, or how often the tasks are to be executed. In addition to the dag_id
, start_date
, and schedule_interval
, each DAG can be initialized with a set of default_arguments
. These default arguments are inherited by all tasks in the DAG.
In the code block below, we define a DAG that implements our gaming company example workflow in Airflow.
Operators
, Sensors
, and Tasks
Although the DAG is used to organize tasks and set their execution context, DAGs do not perform any actual computation. Instead, tasks are the element of Airflow that actually “do the work” we want performed. Tasks can have two flavors: they can either execute some explicit operation, in which case they are an Operator, or they can pause the execution of dependent tasks until some criterion has been met, in which case they are a Sensor. In principle, Operators can perform any function that can be executed in Python. Similarly, Sensors can check the state of any process or data structure.
The code block below shows how we would define some (hypothetical) Operator and Sensor classes to implement our example workflow.
The code defines a subclass of BaseSensorOperator
, the ConversionRatesSensor
. This class implements a poke
method, which is required for all BaseSensorOperator
objects. The poke
method must return True
if the downstream tasks are to continue and False
otherwise. In our example, this sensor would be used to determine when exchange rates from the external API have become available.
The two classes, ExtractAppStoreRevenueOperator
and TransformAppStoreJSONDataOperator
are inherited from Airflow’s BaseOperator
class and implement an execute
method. In our example, these two classes’ execute
methods pull data from the app store APIs, and transform them into the company’s preferred storage format. Notice that the ExtractAppStoreRevenueOperator
also takes a custom parameter, app_store_name
,which tells the class the app store from which to request data.
Note that generally Operators and Sensors are defined in separate files and imported into the same namespace we define the DAG. However, we could have added these class definitions to the same DAG-definition file as well.
Formally, Airflow defines a task as an instantiations of either the Sensor or Operator classes. Instantiating a task requires providing a unique task_id
and DAG container in which to add the task (Note: in versions ≥ 1.8, there is no longer a DAG object requirement). The code block below shows how we would instantiate all the tasks needed to perform our example workflow. (Note: We assume that all Operators that are being referenced in our examples have been defined in or imported into our namespace).
This task instantiation code is executed in the same file/namespace as the DAG definition. We can see that the code for adding tasks is concise and allows for in-line documentation via comments. Lines 10–19 demonstrate one of the strengths of defining workflows in code. We are able to dynamically define three separate tasks for extracting data from each of the app stores using a for
loop. Though this approach may not buy us much in this small example, the benefits are huge as the number of app stores increases.
Defining Task Dependencies
A key strength of Airflow is the concise and intuitive conventions for defining dependencies among tasks. The code below shows how we would define the task dependency graph for our example workflow:
Again, this code is run in the same file/namespace as the DAG definition. Task dependencies are set using the set_upstream
and set_downstream
operators (Though, in version ≥ 1.8, it’s also possible to use the bitshift operators <<
and >>
to perform the same operations more concisely). A task can have multiple dependencies (e.g. combine_revenue_data
), or none at all (e.g. all extract_*
tasks).
The Top Subpanel of Figure 3.2 shows the Airflow DAG created by the above code, as rendered by Airflow’s UI (we’ll soon get to the UI in more detail). The DAG has a dependency structure that is very similar the execution plan we came up with for our example workflow shown in Figure 1.1. When the DAG is being executed, Airflow will also use this dependency structure to automagically figure out which tasks can be run simultaneously at any point in time (e.g. all the extract_*
tasks).
DagRuns and TaskInstances
Once we’ve defined a DAG — i.e. we’ve instantiated tasks and defined their dependencies — we can then execute the tasks based on the parameters of the DAG. A key concept in Airflow is that of an execution_time
. When the Airflow scheduler is running, it will define a regularly-spaced schedule of dates for which to execute a DAG’s associated tasks. The execution times begin at the DAG’s start_date
and repeat every schedule_interval
. For our example the scheduled execution times would be (‘2017–01–01 00:00:00’, ‘2017–01–02 00:00:00’, ...)
. For each execution_time
, a DagRun
is created and operates under the context of that execution time. Thus a DagRun
is simply a DAG with some execution time (see the Bottom Subpanel of Figure 3.2).
All the tasks associated with a DagRun
are referred to as TaskInstance
s. In other words a TaskInstance
is a task that has been instantiated and has an execution_date
context (see Bottom Subpanel of Figure 3.2). DagRun
s and TaskInstance
s are central concepts in Airflow. Each DagRun
and TaskInstance
is associated with an entry in Airflow’s metadata database that logs their state (e.g. “queued”, “running”, “failed”, “skipped”, “up for retry”). Reading and updating these states is key for Airflow’s scheduling and execution processes.
Airflow’s Architecture
At its core, Airflow is simply a queuing system built on top of a metadata database. The database stores the state of queued tasks and a scheduler uses these states to prioritize how other tasks are added to the queue. This functionality is orchestrated by four primary components (refer to the Left Subpanel of Figure 3.2):
- Metadata Database: this database stores information regarding the state of tasks. Database updates are performed using an abstraction layer implemented in SQLAlchemy. This abstraction layer cleanly separates the function of the remaining components of Airflow from the database.
2. Scheduler: The Scheduler is a process that uses DAG definitions in conjunction with the state of tasks in the metadata database to decide which tasks need to be executed, as well as their execution priority. The Scheduler is generally run as a service.
3. Executor: The Executor is a message queuing process that is tightly bound to the Scheduler and determines the worker processes that actually execute each scheduled task. There are different types of Executors, each of which uses a specific class of worker processes to execute tasks. For example, theLocalExecutor
executes tasks with parallel processes that run on the same machine as the Scheduler process. Other Executors, like theCeleryExecutor
execute tasks using worker processes that exist on a separate cluster of worker machines.
4. Workers: These are the processes that actually execute the logic of tasks, and are determined by the Executor being used.
Scheduler Operation
At first, the operation of Airflow’s scheduler can seem more like black magic than a logical computer program. That said, understanding the workings of the scheduler can save you a ton of time if you ever find yourself debugging its execution. To save the reader from having to dig through Airflow’s source code (though we DO highly recommend it!), we outline the basic operation of the scheduler in pseudo-code:
Web UI
In addition to the primary scheduling and execution components, Airflow also includes components that support a full-featured Web UI (refer to Figure 3.2 for some UI examples), including:
1. Webserver: This process runs a simple Flask application which reads the state of all tasks from the metadata database and renders these states for the Web UI.
2. Web UI: This component allows a client-side user to view and edit the state of tasks in the metadata database. Because of the coupling between the Scheduler and the database, the Web UI allows users to manipulate the behavior of the scheduler.
3. Execution Logs: These logs are written by the worker processes and stored either on disk or a remote file store (e.g. GCS or S3). The Webserver accesses the logs and makes them available to the Web UI.
Though these additional components are not necessary to the basic operation of Airflow, they offer functionally that really sets Airflow apart from other current workflow managers. Specifically the UI and integrated execution logs allows users to inspect and diagnose task execution, as well as view and manipulate task state.
Command Line Interface
In addition to the Scheduler and Web UI, Airflow offers robust functionality through a command line interface (CLI). In particular, we found the following commands to be helpful when developing Airflow:
airflow test DAG_ID TASK_ID EXECUTION_DATE
. Allows the user to run a task in isolation, without affecting the metadata database, or being concerned about task dependencies. This command is great for testing basic behavior of custom Operator classes in isolation.airflow backfill DAG_ID TASK_ID -s START_DATE -e END_DATE
. Performs backfills of historical data betweenSTART_DATE
andEND_DATE
without the need to run the scheduler. This is great when you need to change some business logic of a currently-existing workflow and need to update historical data. (Note that backfills do not createDagRun
entries in the database, as they are not run by theSchedulerJob
class).airflow clear DAG_ID
. RemovesTaskInstance
records in the metadata database for theDAG_ID
. This can be useful when you’re iterating on the functionality of a workflow/DAG.airflow resetdb
: though you generally do not want to run this command often, it can be very helpful if you ever need to create a “clean slate,” a situation that may arise when setting up Airflow initially (Note: this command only affects the database, and does not remove logs).
Above, we provided an outline of some of the more abstract concepts underlying Airflow’s operation. In the Final installment of our series, we’ll touch on some of the more practical considerations when deploying Airflow in production.