How Voodoo did Airflow 101

Manuel Pozo
Voodoo Engineering
Published in
13 min readJan 13, 2022

--

The still unknown issue coming from Airflow WebServer UI

Have you ever set up an alarm to remind you to do anything? To wake up, maybe?

The need for task scheduling at some point in the life of a developer is inevitable. This is particularly true for batch data-oriented developers though, who need to run pieces of code periodically as part of their daily routines. We use workflow managers (a.k.a. orchestrators, or schedulers) to tackle this.

This article focuses on Apache Airflow and aims to give a return of experience on how Voodoo had it up and running, but also the pros and cons of this platform, and a few tips to work around some issues we found during our journey. We will especially focus on system maintenance, workflow delivery, workflow isolation, and workflow monitoring.

State of the art: pick your workflow manager

Voodoo is a gaming company that relies on many external third parties and tools, like GameAnalytics, Facebook, or our own internal Google Spreadsheet. The Data Analytics team wanted to have in-house access to these datasets in order to perform their correlations and work their magic. Technically speaking, we had to periodically pull data from these sources and insert or update it in some tables. The keyword here is “periodically” — the rest is only more or less complex coding.

From Wikipedia:

A workflow consists of an orchestrated and repeatable pattern of activity, enabled by the systematic organization of resources into processes that transform materials, provide services, or process information.

A workflow management system (WfMS) is a software system for setting up, performing, and monitoring a defined sequence of processes and tasks, with the broad goals of increasing productivity, reducing costs, increasing agility, and improving information exchange within an organization

There are many approaches and technologies for task scheduling. We can use system Cron or Crontab, Kubernetes CronJobs, AWS Lambda Scheduled, Notebooks scheduling in Jupyter, or other similar solutions.

The fact is that none of these are really designed to host and monitor complex workflows. We knew that we wanted much more flexibility and scalability than simple cronjobs because fetching data from Facebook was not a one-shot need. Pulling data from one or two sources and serving them to our data analysts was by far the simplest thing we could have done at that moment, but decided not to. Instead, we looked at it in a broader scope where more complex workflows would come one after the other.

The true story is that we did not have to look very far. Apache Airflow (by Airbnb) is currently the standard in this area, and I believe I am not alone when I say that this is the case because there is nothing better — for want of a better option — at the moment in the market. Other platforms do deserve to exist and they indeed have very interesting methodologies. For instance, Luigi (interesting comparison with Airflow here), Oozie, Azkaban, Prefect, Temporal.io, Argo Workflow, or in-house schedulers (as scary as it might seem, I confirm that a company or two are doing this). At the end of the day, one technology never fits all needs. There are plenty of articles and comparisons for these technologies. Instead, I would rather list the main aspects that, IMHO, a workflow scheduler must take into account:

  • Isolate workflows from each other
  • Isolate tasks from each other
  • Isolate workflows and tasks from the workflow manager
  • Deal with code-agnostic workflows
  • Include monitoring of workflows and tasks. Ideally, export monitoring metrics
  • Scale horizontally
  • Create workflows easily
  • Be fast
  • Be user-friendly

Spoiler alert, Airflow does not tick off all our needs. But it is able to quickly tick more than the others.

The architecture of Apache Airflow

For the sake of simplicity, below you can find a quick reminder of the purpose of every component of Airflow:

The architecture of Apache Airflow / Example graph view of a DAG
  • Airflow DAG (Directed Acyclic Graph), a.k.a. dag: it is the model that enables to create workflow definitions. It declares the dependencies between tasks and the order in which to execute them.
  • Airflow WebUI: the main user interaction with dags. It allows us to play, to pause, to replay dag runs, as well as to check tasks’ logs. Actually, it continuously looks into its dag folder repository in order to find out new dags and to show the current dag status.
  • Airflow Scheduler: talks directly to its inner database to understand when it is necessary to run new tasks.
  • Airflow Workers: receive the ticket to run a task, and they spawn a process to run it.
  • Airflow Executors: mechanisms by which tasks are run. This component is pluggable giving some flexibility to the user to pick it. For instance, local executors, celery executors, or Kubernetes executors.

Deploying Apache Airflow

Google Cloud Platform proposes managed Airflow under the name of Composer. In November 2020, AWS joined the club with MWAA. Astronomer also offers a SaaS framework around Airflow.

At Voodoo, we are full AWS, but back when we started the project Airflow MWAA was not a reality. We decided to go for deploying and maintaining an Airflow cluster on Kubernetes. We used Helm to take care of deploying the main Airflow components and we provided backend dependencies using AWS managed services RDS(PostgresDB), ElastiCache(Redis), and EFS (Elastic File system). This actually seemed pretty easy on paper. The only thing we had to think about was the strategy to deliver our workflow code to the Airflow cluster (we will discuss that in the next section)

Helm charts help a lot when bootstrapping projects, but more fine-tune and digging about the possible configurations are often necessary. Also, it creates a little dependency between the already deployed system and newer versions for further maintenance. Do not forget to pin your helm chart version, as well as your docker image version properly. If by any chance any of those components happen to update unwisely, you may end up with an unstable or corrupted environment.

We passed one day or two playing around with this setup, which allowed us to seize its reliability and spot a few weaknesses. We are covering some right here:

  • Configuration turned out to be inconsistent. The helm chart suggested going with ENV variables, which is actually very flexible and easy to set up. Airflow version 1.10.9 claims to use a configuration loading chain based on ENV > airflow.cfg > defaults built-ins which applies to all the airflow components. However, airflow-cli does not follow this. This command-line interface reads first from airflow.cfg. Making use of this CLI was something exceptional at first, but it is also very interesting if we want to have access to some other features, e.g. task test, or dag run backward.
  • Prior to Airflow 2.0, the scheduler component was not designed to support High Availability. This meant that only one pod could run at a time to ensure workflow consistency. Eventually losing this pod would mean that new tasks would not be scheduled.
  • The WebUI component scans all the dag bags constantly by seeking all python files and looking for DAG instances in a global python file context (global scope in python scripts). This double role of WebUI and dag discovery is in fact a weakness that leads to many frustrations: “the dag X is missing”, “Ooops, something bad happened”, or my favorite system.exit(). In fact, Airflow is built using python, and in order to recognize if the dag is in a global or scoped context, the WebUI passes through every line and function in the bytecode. That means if you write a print line in the script, the WebUI will print it during dag scanning. Hence, if you perform a system.exit(0), the WebUI pod will go to take a break as well. This happens simply because DAGs are not isolated from the Airflow core components. Digging to find that issue was not complex following up all our git commits. Discovering this behaviour and that this could occur was the first time we asked ourselves if we made the correct decision.
  • Installing extra packages can be done in several ways. For instance, we can extend the docker image hosting airflow packages and pip install the new requirements. Also, we can do it using scripts in our pods. In any case, the real issue we found here was to understand what was strictly necessary by Airflow, and what was not. Furthermore, what was potentially breaking the inner airflow dependency tree. For instance, Airflow 1.10.9 required a special version of SqlAlchemy. We made the mistake of letting the system update this package, which created an inconsistency in how Airflow uses the SQL engine. Again, this can be solved with higher dependency isolation
  • The airflow scheduler is in charge of pushing workflows’ tasks into a queue, which will then be read and run by airflow workers. The number of workers can be configured, as well as the number of concurrent dags, concurrent tasks running from the same dags, or concurrent tasks across all dags that these workers are polling and running. These are parameters you set when you deploy your cluster and they can change by redeploying a new configuration. There are cases in which automatic horizontal scaling is very useful to handle a big spike of queued tasks. This was part of advanced configurations that we struggled to set up properly due to a non-stable deployment process.
This is how a sudden metadata dag issue looks like

Delivering Workflows

Most Airflow components need direct access to the dag repository. It is important to seize how your organization and workflow code repositories would materialize into your final Kubernetes deployment. Hence, the scope behind this section hides many questions. How do we organize our airflow projects? How do we share common custom modules and operators? How do we ensure a simple and consistent code delivery within the Kubernetes environment?

We have decided to handle all workflows in one centralized git repository. This would simplify the CDCI and would make it easier in our early stages to use common shared code across these dags (e.g. custom operators or helper functions). At that time, we were only a few people working on the project and this made us more agile and increase our business impact in the very early stages. The code versioning was handled by Github and we used a single master strategy. Using a simple CDCI with Jenkins could enable us to deliver the code fast. On the Kubernetes side, we wanted to use side-car git containers within pods, but we realized this could lead to potential inconsistencies in case of pods failures. We preferred to ensure the state of our code by having a centralized code delivery with AWS EFS connected to one Kubernetes Persistent Volume for that. Then, our pods only need to mount the EFS. This was actually very useful because we could use it to store Airflow logs as well.

Almost two years later, we still have a similar pattern. Our codebase and the number of contributors have increased greatly. We started with around 30 dags in the first weeks of the life of our deployment, and today we are maintaining more than 200 dags. We have realized that this approach has started to generate frustrations and pain points. We are considering using other git workflows, better handling common modules and operators using external python packages, and better exploiting the DagBags feature. We consider this topic a very interesting one, and we will write a dedicated article for it.

Isolating Workflows

The Airflow Scheduler continuously checks on incoming new tasks to assign to Airflow Workers. Workers can take tasks from any workflow in progress. This means two tasks of the same workflow can run in two different workers in parallel, at least in default simple airflow setups. This is fine in many cases, but ideally, we aim to isolate tasks as well in order to reduce side effects. There are some isolation methods already proposed in Airflow.

The main contributors during the early stages of Airflow were more or less fluent in python. For that end, Airflow proposes using PythonVirtualEnvOperator, which spawns a python process that takes in a requirements.txt and creates an isolated Python virtual environment. This is very useful to isolate the task from the Airflow core dependencies, but still the process is running with the same Airflow Worker system and processes could affect each other.

Many Voodoo developers prefer using JavaScript. This means that sooner or later workflow complexities would lead them to write their own code in their preferred language and environments. In addition, after our experience with system.exit() we wanted to isolate our workflow tasks as much as we possibly could. We considered starting using DockerOperator but since our cluster was already in Kubernetes, we opted to use the KubePodOperator.

The integration of this operator was very simple and actually improved a lot the way we design workflows. This also means that we have split the lifecycle of our workflow project into business-code and workflow-code: first, we develop all our business code tasks, second, we use Docker containers to package and version this codebase, and finally, we make plain simple Airflow operators call and deploy our versioned containers. One could argue that we lose some benefits of Airflow, like xcom exchanges or existing operators. On the other hand, we can benefit from much finer control of our tasks: we can track the resources consumed by tasks in our spawned pods, being able to establish limits in CPU and Memory, which was before shared in-worker. In this mode, workers would be like a simple Kubernetes client launching pods and waiting for them to finish.

We can push this idea a step further by enabling auto-scaling and AWS Fargate instances: airflow can in fact trigger new tasks which in turn will make the cluster provision more one-shot resources. This is handy more a bit more intensive workloads that could not yet require distributed computing.

Monitoring Workflows

Airflow proposes various tools around logging and metrics from workflows. The WebUI makes easily accessible a lot of information of task statuses and logs in every dag run. From the workflow code, it is also possible to use callback functions to perform actions around these statuses in order to set up alerts. However, having more complex monitoring is up to the user and the workflow code.

For instance, imagine you have an ETL workflow to pull data from an API and upsert it into a database. The basic monitoring can tell that the whole workflow has finished with success, the runtime was 10 minutes, and other interesting metrics. How can you be sure that the data has actually been properly upserted?

At Voodoo, we have developed a custom monitoring operator which runs at the end of our data workflows. This operator works a bit like a simple data quality tool: it simply performs SQL operations around the final tables in our data-lake and data-warehouse aiming to collect global metrics during dag runs. Also, we have set up an anomaly detection system on top of these collected metrics. For instance, we monitor the data volume of the table between different dag runs, and using volume anomaly detection we can spot unexpected data gaps. We also installed the (airflow-exporter) on our monitoring stack, which enables us to enlarge the technical monitoring around our workflows. For instance, it was much easier to compute data freshness and to represent data arrival in a Grafana dashboard. These custom metrics could not be easily accessed via de WebUI, hence we decided to connect them to our monitoring stack (Prometheus, Grafana, but this is another different topic).

Global monitoring of Apache Airflow in Grafana
Data volume monitoring integrated into all our data Airflow DAGs

The learnings from this part of the project are not necessarily mind-blowing. We did set up Slack notifications to alert us in case of failures. It turned out that — in the early stages of our deployments — this happened a lot and it very quickly caused alert fatigue. Notifications went unread or dismissed, and jobs remained in failure states. We decided to go for a more explicit approach and use dag scoped email notifications. This really helped the team to refocus and better maintain their workflows. Today, while failures still exist, they are rare and very meaningful. We have enabled back Slack notifications and we are considering a more multichannel approach for this.

Conclusions and next steps

Voodoo has a very strong mindset for quick testing and delivery of projects. We had to make some early decisions in order to bring business value as soon as possible. We deployed Airflow into production in a matter of days and we could start contributing to the code base and adding workflows very soon as well.

Today things have changed and we continuously iterate to improve the way we work with Airflow focusing on creating standard workflows, common operators, and a useful codebase. We have passed from really scattered and uncorrelated data services to a high data nutrition approach around business needs and insights, and we use Airflow to orchestrate it all. I think that there are certain aspects of Airflow itself that can be improved, especially around workflow parsing consistency and task isolation. But the technology and community are evolving and many of these are now easier to tackle. Airflow 2.0 was released in December 2020, and with it came very exciting features and enhancements. Actually, we have adopted this new version and migrated our Airflow cluster into AWS MWAA service (we will present tips for this in another article).

Personally, I do not regret having installed Airflow today. As usual, I just wish I had known much sooner all the things I know now =]

In the next articles, we will present some of the workflow standards we use on a daily basis as well as how we propose to push workflow isolation to the next level using Kubernetes, our MWAA setup, and so much more.

Stay tuned!

References

--

--

Manuel Pozo
Voodoo Engineering

DataOps — I love to apply what I learnt in the every-day-life