Airtunnel: A blueprint for workflow orchestration using Airflow

Jörg Schneider
Dec 5, 2019 · 15 min read

By Jörg Schneider and Niels Degrande of BCG GAMMA Engineering

Airflow is the undeniable champion of Python-based scheduling and, as of 2019, an Apache top-level project. Along with its great flexibility comes the challenge of using Airflow to craft codebase structures with sustainable patterns. In this article, we introduce Airflow and airtunnel, a set of principles and an open source library that tames your Airflow!

Introduction: Workflow Orchestration Necessity and Use Cases

As part of our work at a data science consultancy, we build and operate systems to assist in various analytics projects. These are often large-scale projects involving thousands of jobs per day processing hundreds of data assets, along with sizeable developer teams collaborating on workflows. In the following discussion we capture some of the learnings on workflow orchestration for (big) data processing.

For any workflow¹ that might be repeated more than a handful of times, one should typically consider automating the process. In this article we will argue that for reasons of productivity, repeatability and documentation you need some degree of automation via workflow orchestration. To be clear:

Productivity in this context refers to the ability to easily and efficiently perform a series of steps as many times as required. The IT saying to “automate and modularize early-on” is especially true for workflows.

Repeatability relates to executing tasks in the exact same order and in a consistent manner. Data processing applications, for example, require repeatability to make sure that findings can be reproduced.

Documentation for a workflow, in this context represented as the code or configuration that orchestrates it, is as deterministic as it can get and therefore a powerful tool for knowledge sharing and transfer. This form of “documentation” does not — and should not — exclude human-readable documentation that enhances understanding of the workflow.

A good example of the need for workflow orchestration is an ETL process. Often an ETL process will be repeated daily and, as such, requires consistency. Failing to maintain consistency can corrupt decision making, business processes, and other business tasks. (Note: Unless there is certainty the above is a one-off event, orchestration should be considered from the start.)

An ETL process workflow often includes the following steps:

1. Connecting to the data source.

2. Fetching and temporarily storing raw data.

3. Performing a pre-defined list of transformations such as deduplication and correction of known data issues.

4. Loading the final data into the target system.

Another workflow example (not involving data), relates to building, testing and deploying software (cfr. CI/CD). This is another often-repeated, multi-step process that requires constant scrutiny over how the different steps are performed. This process typically involves checking out the code from version control, performing quality control, compiling artifacts, running tests, inspecting the results, and deploying to multiple environments. Given this complexity, a manual approach would be very tedious and prone to error. Furthermore, lack of clear workflow orchestration can significantly hinder the ability of multiple developers to work together on a single application.


Current State of Workflow-Orchestration Tools

A vast range of tools for workflow orchestration exists today, including open-source tools, cloud-based proprietary tooling for data processing orchestration, and lightweight, extensible schedulers. Below, we map out each tool’s approach to the topic at hand. These tools are largely focused on batch data processing and less applicable to near-real-time or other types of workflow applications.

Open-source workflow orchestration tools

Many of these open-source tools have their roots in utilities built as part of larger projects, such as Hadoop, and have since evolved to become full-fledged workflow managers. Below are the most mature tools, offering great out-of-the-box experiences. These frameworks are the most opinionated with respect to building workflows.

Cloud-based proprietary tooling for data processing orchestration

The following tools are either custom-built by large cloud-service providers or are managed versions of the above-mentioned workflow orchestrators.

Lightweight, extensible schedulers

The next set of tools are simple schedulers that, in combination with shell scripts or a larger code base, can replicate some of the functionality offered by the above-mentioned tools. They are useful for small-scale projects that do not require a framework, but do not offer more advanced features such as a UI for managing and monitoring workflows, plug-ins for generic functionality, or the ability to easily handle task dependencies.

Note: The data for the tables above was gathered in August 2019. Popularity refers to number of GitHub Stars.

When to use Airflow as your workflow manager

Reasons to consider Airflow for managing your workflows include:

1. Flexibility and scalability: With Airflow you can get started quickly and continue using it when workflows become more numerous and complex.

2. Resiliency: Airflow helps you handle delayed or failed tasks or workflows by providing advanced scheduling features (e.g. depends-on-past), a retry framework and a built-in alerting mechanism.

3. Write workflows programmatically: Airflow is especially helpful when you are already using Python for your application or algorithm.

4. Easy-to-use web interface: Airflow makes it easier to monitor and control workflow executions such as trigger runs, and to gain insight on (historical) runs and failures.

5. Future Proof: Airflow has a rapidly maturing framework with a large and rapidly growing user base, both of which drive a fast release schedule.

Additionally, Airflow comes with a range of advanced features: triggering one workflow from another, branching into downstream paths at execution time, backfilling support, and resource management with support for SLAs.

When Airflow might not be a great fit:

1. When no scheduling is needed, in which case lightweight alternatives can be used.

2. If your developers are unfamiliar with Python, their use of Airflow may partially void the advantages of workflows as code.

3. When you are dealing with mission-critical jobs that require high availability and support, in which case enterprise-grade solutions such as Control-M may be more appropriate.

4. When you need fine-grained access control and authentication. This capability is slowly being added to Airflow, as is often the case with open-source software.

5. When you need to orchestrate real-time pipelines or processes.(Airflow is more batch oriented.)

Airflow primer

Airflow is a Python-based workflow orchestrator, also known as a workflow management system (WMS). Originating from AirBnB, it is an Apache Top Level Project with nearly 900 contributors² to date. Its mission is:

The creation and maintenance of software related to workflow automation and scheduling for authoring and managing data pipelines

(In the next section we briefly introduce Airflow to ensure a common baseline. The experienced reader can skip this section.)


Airflow: Core concepts

Airflow’s core building blocks include the DAG, Operator, Task and Task Instance.

Directed Acyclic Graph (DAG)

A workflow in Airflow is represented by a directed acylic graph (DAG). This graph consists of tasks and the (directed) dependencies between them. To create a DAG, add a DAG object (a Python script) to the dag_folder, from which the so-called dag_bag will collect the most recent definitions. Definition, as a DAG object, serves only as a description of the graph structure, complemented by a set of properties for scheduling and execution. Periodically the Airflow scheduler will evaluate the definitions part of the dagbag, an action which then leads to DAG runs, ensuring timely workflow execution.

An example of an Airflow DAG definition.

Operators and tasks

By considering all graph dependencies, the scheduler creates task instances from task objects that are instantiations of operators. Various operators exist, each performing some type of work. Sensors are a particular type, waiting until a condition is met — and thus blocking the downstream execution graph. It is possible to extend Airflow functionality by writing custom operators. Task instances run isolated from the DAG context and can be allocated to different workers. Due to this feature, one should not use standard Python constructs for communication. Instead, use XComs to share information between task instances.

An example of an Airflow operator implementation.

Architecture

There are three main components to the Airflow architecture:

1. The scheduler/executor, as explained above, collects DAG objects, starts DAG runs and assigns task execution to worker processes. Additionally, it handles DAG and task failures, as configured in the DAG definition.

2. The webserver runs the Airflow user interface, allowing the end user to interact with scheduling and task execution and to monitor runs.

3. The database captures metadata on DAG and task runs and is therefore essential to the scheduler. Among other operations, the database will hold the state of DAG and task execution (queued, success, failed or up-for-retry) by execution date.


Deployment scenarios

It is extremely easy to get started with Airflow. You can install it as a package from PyPi or download the source code from GitHub. Alternatively, you can pull an existing Docker image or install one of the available Helm charts. Without further configuration, your Airflow instance will likely run in standalone mode, with the Airflow SequentialExecutor configured as executor relying on an SQLite database. Due to SQLite limitations, there will be no task parallelization. For most purposes, this mode should not be used in production, though it is great for learning and experimentation.

You can step up the functionality by using the Airflow LocalExecutor with a more robust database like MySQL or PostgreSQL. In this scenario, the Airflow instance will parallelize task instances locally by spinning up several worker processes. Although this approach is more scalable, all task handling happens on the machine running the scheduler/executor process. Consider this setup when task execution is not too computationally heavy, or when data processing can be pushed to a Spark cluster or similar.

When scaling up does not cover your need, Airflow offers a variety of options for scaling out, running in distributed mode. Using the Airflow CeleryExecutor, tasks are added to a queue with a set priority. Distributed workers will pop and execute the tasks, isolated from the scheduler. Adding workers to the resource pool enables you to further parallelize task execution. The Airflow KubernetesExecutor will create a pod for each task instance, further isolating execution. The Airflow DaskExecutor allows you to run tasks on a Dask cluster; the MesosExecutor will use Mesos slaves.

Additionally, Airflow has integrations with the major cloud providers (mostly AWS), offering a range of Operators, Sensors, Hooks and storage options with respect to their services. Since running a large-scale Airflow instance might require a great deal of configuration, we recommend that you automate deployment early-on, such as by using Ansible or Helm.


Getting the most out of Airflow: A blueprint for data pipelining in analytics

While the Airflow project gives its user a diverse set of tools and options to use them, these tools are shipped without guidance on how to assemble and structurally lay out a larger Airflow-driven codebase.

It is helpful for Airflow to provide so many scenarios in which a deployment of it could work. Even so, there is a risk of ending up with no overarching or consistent structure at all — as long as everything just works. Think of a small codebase that starts out cleanly structured only to become inconsistent and fragmented over time as different developers contribute to it without strongly affirmed and enforced conventions.

This section is a blueprint for Airflow that can help you to avoid the latter. It is tailored to arguably one of the most common use-cases of the tool, which is batched data pipelining in analytics.

We are well aware that there are many such approaches available, each born out of particular situations and having their specific place. We also understand that some cases fall outside our proposed structure. In fact, we have encountered them ourselves. In our view, this is totally fine – as long as the exception does not become the rule.


Design Principles

Workflows for data pipelining in analytics tend to resolve around the data asset — this being the main subject of interest for which work is carried out. Eventually, workflows serve the purpose of automating conditions and transformations that apply to data assets along their lifecycle, primarily receiving external data updates and applying them downstream to all dependent data assets. Due to this, the data asset is the most important concept in our blueprint and is at the center of all building blocks. The following three sections will introduce design principles that can guide you in using building blocks to assemble a holistic data pipelining stack.

Uncompromised consistency

Consistency is at the heart of any successful software project. In data processing, consistency is achieved primarily by strict naming conventions and a clear codebase structure. Such consistency enables new team members to quickly onboard, and has the added benefit of highlighting inconsistencies. Unit tests are a good way to actually ensure consistency.

Given that the data asset is at the center of the project, consistency should span:

  • naming (& placement) of data assets
  • naming of script file(s) to process said data asset
  • naming of workflows; as in Airflow DAG & operator IDs
  • a strict 1:1 relationship between data scripts/tasks and data assets. In other words, do not load three tables using one script, rather use three scripts (refer to: ‘Declaration & data scripts store’ below to see how this improves consistency)

Consistency will help team members form a complete mental image of how aspects of the project are connected. Ideally, it also yields a structure, such that no one has to do time-consuming searches for scripts or perform similar actions. Instead, developers are enabled to quickly infer names and locations just from a data asset name at hand.

Declarative first and low-code redundancy

We have introduced Airflow as a “workflows as code” type framework. While we absolutely love this flexibility, it may lead developers to write redundant code and/or bury configuration properties in it. There is no need to write DAGs and processing scripts completely from scratch. In fact, doing so can cause problems — especially if you have hundreds of such DAGs and scripts.

As is the nature of data pipelining, process-oriented task types such as ingesting, loading, and moving data are constantly recurring along the data asset lifecycle. The only difference per instance is what a particular data asset requires: Ingest from where and using which mechanism? Load delta or in batch? Etc.

This is why we postulate to have a central declaration file (as in YAML or JSON) per data asset, capturing all these properties required to run a generalized task (carried out by a custom operator). In other words, operators are designed in a generic way and receive the name of a data asset, from which they can grab its declaration file and learn how to parameterize and carry out the specific task.

Key benefits of this approach are that it:

  • Enables standardization of basic data pipelining tasks, which makes it possible for even novice developers to build workflows simply by providing the correct declaration.
  • Leads to the creation of a “single source of truth” through extraction of data asset properties into declaration.
  • Allows easy declaration-testing for consistency and validity using unit tests.
  • Enables cross-programming language use of declaration files.

Metadata driven

Metadata is knowledge, and knowledge is power. In this case, “power” refers to how to best execute a workflow, analyze it later and maybe even optimize it. We propose a low-overhead metadata interface that should extend upon Airflow’s internal data model.

In particular, we need to add the dimension of the data asset, since Airflow’s data model is based only on its own objects such as DAGs, Tasks, and others. Here, a link between data assets and operators or DAGs needs to be established because a stakeholder will probably never ask you: “<Your Name>, when did DAG load_all_the_data finish today?” Instead, they have a particular data asset in mind.

To be truly metadata driven, one should also collect information on input files (such as sizes and modification times) as well as on internally constructed data assets (such as which partitions were updated and at what time). Downstream jobs then can easily leverage this information to construct efficient delta mechanisms such as recalculating and replacing a partition whose input data has changed, or running efficient data-quality checks based on updated data.

It is worth considering how much developer time needs to be invested in a framework like this. If, for example, all imaginable jobs on a given day can work as a full load since data volumes are low, there is no need to optimize delta updates based on metadata.


Airtunnel Building blocks

We now complement the three design principles listed above with three building blocks that make up our Airflow-based data pipelining system Airtunnel.

Physical data store

Each data layer must store data persistently and efficiently. This is why the physical data store — be it a SQL system, an S3 bucket or cloud-enabled data lake service — is a key building block, worthy of a rigorous structure.

In a scalable, filesystem-like storage medium, for example, a common approach to structure includes:

ingest: Raw files as received from sources

  • landing: Incoming data files will be 1:1 loaded in here and suffixed with a timestamp of arrival
  • archive: Files from landing that have been consumed by a pipeline will be moved in here

ready: Processed data assets that are ready to be read from, each one being in its own folder. Ready assets may possibly be further partitioned using a Hive/Spark style folder convention, according to which each subfolder is named using the partition predicate (useful to seamlessly read data assets using Hive, Spark or PyArrow).

staging: Files in staging are not for general consumption because they are incomplete or currently being worked on. Staging is further broken down into pickedup (data that was moved there from ingest/landing), intermediate (any kind of temporary data for intermediate processing steps) and ready (where the next version for the ready layer is produced). Use an atomic move operation (or SQL transaction) to push finished data from staging/ready to ready, so consumers will never run into access issues or tap into half-finished files.

archive: Whenever a new version of an asset in ready has been computed and it is valuable to keep a copy of the previous run, move it here under [asset-name]/[load-time]/.

export: This is for files to be exported to other consumers and that will never be re-introduced into data assets contained in the folders above. Examples include final csv-exports, front-end specific data and reports.

Declaration & data scripts store

Part of the version-controlled codebase should be a declaration store as noted above. The goal here is to have one declaration file per data asset, named exactly like the data asset:

/declarations/daily_sales_header.yaml
/declarations/daily_sales_line_item.yaml
/declarations/product_master.yaml

If the physical data store is a RDBMS that makes heavy use of schemas, we recommend nesting the file names accordingly, such to have /declarations/fact/daily_sales_line_item.yaml for the data asset fact.daily_sales_line_item.

Similarly, you should structure data scripts as part of the codebase and according to language, using primarily the data asset name that is produced or updated as the filename:

/scripts/py/daily_sales_header__ingest.py
/scripts/sql/ddl/daily_sales.sql
/scripts/sql/dml/daily_sales_header__postprocessing.sql
/scripts/sql/ddl/daily_sales_aggregated.sql
/scripts/sql/daily_sales_aggregated.sql

A script-name suffix following the data asset name, such as “__ingest” above, can indicate what happens. In doing so, we can easily grasp (without even reading any code!) that data ingestion is done using Python, followed by post processing using a SQL script and an aggregation that will produce the new data asset daily_sales_aggregated. Also note, that SQL scripts are categorized into dml and ddl scripts.

An even cleaner approach is to structure imperative Python scripts around a data asset abstraction layer, provided by a Python class. Check out “Putting it all together” below to see this in action!

Custom operators

Out of the box, Airflow comes with an impressive set of operators and hooks. While we advocate making as much use of them as possible, you will have to create custom operators to facilitate the framework we have introduced up to this point.

The core idea is that most custom operators with distinct logic (let’s call one PySparkIngest) would primarily receive the name of the data asset affected by the operation. Using the unique data asset name, the operator is able to load the declaration file from the declaration store. To carry out its task, the operator would depend on the properties (such as source system, filename regex, and header row) coming from the declaration file, while its code remains generic. If required, the operator might be complemented or partly overridden by data asset specific imperative actions. For example, these could be defined in /scripts/py/daily_sales_header__ingest.py. Other operators, such as SQLOperator, will simply receive the path of a template-able script under /scripts along parameters.


Putting it all together

You have made it to the end of this post, so congratulations are in order! Your patience will soon pay off, because everything described above is ready for you to use on your next project. Check out airtunnel for:

  • The Airflow blueprint codebase using data asset abstractions
  • The declaration store, along with a Python interface to bootstrap data asset abstractions from declaration files, load and process them, and more.
  • The physical data store structure exemplified on local disk and easily extendable to any storage through an interface.
  • Several custom operators that showcase the strengths of our airtunnel blueprint.
  • Metadata extensions to the Airflow data model, including pain-free data asset lineage collection

We are confident that, when you apply what we have described above to your next data pipelining project, you will be successful. Please send us your thoughts and contributions so we can continue to improve this open-source solution for the good of all. Thank you, and happy pipelining!

[1] A workflow is a sequence of tasks or steps to be performed in order, similar to a process. For the purpose of this article, workflows will consume and transform IT resources such as systems and data.

[2] As published on GitHub in August 2019.

BCG GAMMA

GAMMAscope - The Blog

Jörg Schneider

Written by

Senior Machine Learning Engineer @BCG GAMMA Engineering

BCG GAMMA

BCG GAMMA

GAMMAscope - The Blog

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade