Data Traffic Control with Apache Airflow

leboncoin tech
leboncoin tech Blog
18 min readJan 9, 2019

by Nicolas GOLL PERRIER (Data Engineer)

Leboncoin is the 7th most accessed website in France; this web traffic activity generates a lot of different events that need to be collected, transformed, reconciled, analysed and understood in order to fully grasp our customers and market, and improve upon our classified ads services.

In order to handle the diversity, volume, and complexity of events, we build workflows to ensure that our data is processed in an orderly and coherent way.

A good analogy for this job is the flight traffic control at the core of any airport. Each transiting dataset is like a plane carrying valuable information requiring to go from point A to point B. Our responsibility is to ensure on time and accurate transit of those “planes”, but we can neither control the “weather”, nor the behavior of those precious “passengers”.

So we need to be vigilant and establish the best flight paths for the streams and batches of data we ingest and transform, re-shuffle the priorities, stall some in-flight datasets at a certain “altitude” (postponing non critical information availability), while re-allocating “fuel reserves” (CPU, Memory, Bandwidth…) depending on the actual and current business priorities of the company.

From Airstrip to Airport

In the beginning, we were able to somewhat cope with the growth of the site with traditional Business Intelligence tools and practices (good old Extract Transform Load tools, cron-based orchestration, limited dashboards…). But as Leboncoin kept growing, we had to adapt our data-operations accordingly.

Therefore, we had to completely rethink our data architecture several years ago to:

  • Scale our data pipelines to process workloads of up to several terabytes a day efficiently
  • Adapt to the platform’s technological and organisational shift to a micro-service architecture
  • Quickly detect failures and inconsistencies in the many data processes run each day/hour
  • Respond to internal or external faults without impacting the quality, conformity, and availability of actionable information to our business users

Flying through the data-hurricane

Nowadays, we have different kinds of data processing batches running every hour of the day: Spark transformations, Kafka topics S3 archiving processes, Data-Warehouse loads, dataset indexing, business extracts, regulatory extracts, Machine Learning training and predictions… Too many to manually ensure that all our data stores are properly up to date and missing no data-points.

Moreover, data engineering and analytics are downstream the information system. Every incident, failure, bug, inconsistency will at some point end-up in our pipelines: so some input dataset may have changed its format without notice, a third-party data-source might become unavailable during the night, a real-life event triggered an unexpected increase in a specific customer activity, which produces more events and takes longer to process…

An information system is a living organism. We might prevent some of those things from happening, and we have contingency plans, but we can’t predict everything, especially in a quickly growing and evolving organisation.

So when dealing with our large set of data transformations, we need to guarantee to our customers, both internal and external, that those transformation will run consistently and quickly enough to provide relevant, accurate, and up to date insights. And if something were to fail, quickly assess the impact of the failure on all depending systems, reports, and dashboards.

Several tools such as Amazon S3, Apache Spark or Redshift have helped us tackle most the scaling issues, but as data processing workflows kept accumulating and were getting more and more complex, we needed a proper way to ensure that those ran like clockwork. And we were fortunate enough to try on Apache Airflow early on, just after it was open-sourced.

Airflow: Pre-flight checklist

Airflow has been developed with data-engineering challenges in mind. It is above all a DAG (Directed Acyclic Graph, a fancy way of saying “a workflow that does not loop”) management platform, or a worfklow orchestrator if you prefer.

Airflow

Airflow enables you to define, schedule, run, monitor and manage your workflows in pure Python code, while also providing the tools and UI to handle those workflow operations

It provides:

  • Retry mechanisms to ensure that each and every anomaly can be detected, and automatically or manually healed over time (with as little human intervention as possible)
  • Priority aware work queue management, ensuring that the most important tasks are run first and complete as soon as possible
  • Resource pooling system to ensure that, in a high concurrency environment, thresholds can be set to avoid overloading input or output systems
  • Backfill capabilities to identify “missing” past runs, and automatically re-create and run them
  • Full history of metrics and statistics to view the evolution of each task performance over time, and even assess data-delivery SLAs over time
  • An horizontally scalable set of alternatives to the way tasks are dispatched and run on a distributed infrastructure
  • A centralized, secure place to store and view logs and configuration parameters for all task runs
Fig 1 — Example representation of a DAG in Airflow

All these features allow us to run more than 10,000 automated tasks each day without breaking a sweat. Even in the case of a major failure, recovery requires very little human labor, as the system eventually heals itself automatically.

Airflow’s architecture relies on several components, and a few auxiliary tools:

  • A Task Scheduler, the heart of Airflow which keeps track and prioritizes which tasks to run, retry, abandon, when to start them…
  • One or more Task Executor(s), steered by the scheduler, and taking care of actually running individual tasks
  • A Web User Interface for easier overview and manipulation of DAG statuses and history
  • A Command Line Interface to manage the scheduler in a more automation friendly way than the UI
  • A central database that stores all stateful information

In order to fully grasp how Airflow can be used in a data-processing environment, we’ll review, step by step, a (largely simplified) example of the process that we follow to develop a minimalist workflow on a data sample.

Installation: Aligning with the runway

Airflow is written in Python (2.7 and 3.5+ compatible), so you’ll need a working Python environment with PyPi installed. Then the base packages can be installed as follows:

pip install apache-airflow

There are many extensions which provide additional features, and depending on your ecosystem, you may want to include them or not. For the purpose of this article, you should add the following extra packages:

pip install apache-airflow[postgres,s3]

Then, we need to indicate airflow where to store its metadata, logs and configuration. This is done through the AIRFLOW_HOME environment variable.

We can then trigger the initialization of its back-end database:

This will create a default configuration file and initialize an empty back-end DB. In the freshly created configuration file ${AIRFLOW_HOME}/airflow.cfg, you’ll find all the configuration parameters for every component of Airflow. We’ll look deeper into two very important parameters from this file:

[core]
# Back-end storage url
sql_alchemy_conn = sqlite:////tmp/airflow/airflow.db
# Task execution model
executor = SequentialExecutor

The first one is the backend metadata storage database. The default is SQLite, which is almost only useful for unit testing, and even in a development environment, we strongly recommend changing with a little bit more kick.

At Leboncoin, we use PostgreSQL, because it simply is one of the most rock-solid, full-featured Open Source RDBMS, and we have a long happy history with it.

And since Airflow uses SQLAlchemy (Object Relational Mapper) and Alembic(database schema migration management), switching over is as simple as:

  • Installing and setting up an empty PostgreSQL instance using your method of choice
  • Changing the connection string in airflow.cfg to point to your DB instance setup (i.e: sql_alchemy_conn = postgresql+psycopg2://airflow:password@localhost:5432/airflowdb)
  • Re-running the db initialization (this time with the new setup) with airflow initdb
  • That’s it, we now have a more sturdy back-end for our Airflow setup ready to be used

The second most important choice, is which kind of executor(s) you will use, which can be set up using the executor configuration property. Airflow proposes several executor out of the box, from the simplest to the most full-featured:

  • SequentialExecutor: a very basic, single task at a time, executor that is also the default one. You do NOT want to use this one for anything but unit testing
  • LocalExecutor: also very basic, it runs the tasks on the same host as the scheduler, and is quite simple to set-up. It’s the best candidate for small, non-distributed deployments, and development environments, but won’t scale horizontally
  • CeleryExecutor: here we are beginning to scale out over a distributed cluster of Celery workers to cope with a large sized task set. Still quite easy to set-up and use, it’s the recommended setup for production
  • MesosExecutor: if you’re one of the cool kids, and have an existing Mesosinfrastructure, surely your will want to leverage as a destination for your task executions
  • KubernetesExecutor: if you’re an even cooler kid, support for Kuberneteshas been added in version 1.10.0

For the purpose of this tutorial and brevity’s sake, we shall pick the LocalExecutor, which is well suited for development and small deployments, and does not require any additional setup.

You may now start your Airflow instance, by launching its scheduler (which will embed the LocalExecutor in this example setup), and the web UI (optional but quite handy):

airflow scheduler &
airflow webserver &

Obviously you’ll want to have proper systemd/upstart/docker services in a real deployement environments.

You now should have a working local Airflow instance, and can view it’s Web UI at http://localhost:8080, with a set of default examples (unless you disabled them in the configuration file).

This instance runs locally, however, if you want to start experimenting with Airflow inside docker, we strongly recommend Matthieu Roisil’s public Airflow docker image. We have had no issue running Airflow in a dockerized environment at Leboncoin in production using a similar setup.

Core concepts: Aerodynamics

Before we start coding our first DAG, we must first understand a few core concepts about the main user-defined objects that Airflow handles:

  • DAG: The most high level object, it’s a Python object defining a workflow’s organisation, structure, and schedule. All tasks of a specific workflow will be attached to it.
  • DAG Run: This is the run-time version of a specific DAG, thus it is provided with additional context (such as when it was triggered, for which date/time period), and has a state (is it currently running? when was it started? is it finished? did it succeed? how long did it take? …)
  • Operator: A Python class defining how to perform a specific operation, which can then be used in a dag. Airflow comes with a set of standard operators (to run a SQL query, execute a bash operation, a Python function, a HDFS transfer operation, perform a check, run a docker container, run a Spark Job), but you can also define your own implementations
  • Task: This is a step in the DAG, a configured usage of a specific Operator, that you configure to define which upstream task precedes it, thus organizing the DAG’s workflow logic, dependencies and processing order
  • Task Instance: This is a contextualized run-time execution of a specific task, just like a DAG Run is the contextualized run-time version of a DAG. Task Instances will be pushed by the scheduler to the worker and trigger the execution of a specific job

Preparing your first data DAG

We’ll now build a very basic yet typical “wait for data/process data” workflow handling a simple raw click-stream events aggregation scenario.

For this we’ll need a second database, some kind of “mini” Data Warehouse, typical to data-processing environment. Since we already should have a PostgreSQL instance running for the Airflow back-end, we’ll re-use it and create another database, named miniwarehousedb, along with a dedicated user/password pair etl/password for testing purposes.

Then, we can load the following DDL into the miniwarehousedb’s public schema:

In order to properly connect to this database from our Airflow instance we will also need to declare a connection to our newly created database, using the CLI (or the UI):

This creates a database hook in Airflow that our DAGs can use without having to know the details of the connection itself.

Then we should be ready to dive in the core of Airflow: writing your own DAG.

Writing a minimalist data workflow DAG

Airflow does not have a specific DSL to define DAGs — you just write Python. This enables easy versioning, collaboration, and above all, dynamic DAG generation (not to be mistaken with dynamic DAG execution, which we’ll talk about later).

So let’s just start by declaring a simple daily-run DAG. Create a new Python file in your ${AIRFLOW_HOME}/dags folder (the default folder scanned by Airflow for DAGs):

This creates an empty DAG, planned to start every day (at midnight) from 2018–10–01 onward. If any of its task fails, it will be retried 3 times, at 30 minutes interval.

Note that you can also specify a more precise, cron-like string as schedule in schedule_interval, but we tend to refrain as it’s less readable and in our opinion a misuse of the scheduler’s capabilities.

But our DAG is just an empty shell so far, so let’s add a couple of tasks to it. First, let’s add a task using a sensor operator.

This sensor will be the starting-point of our DAG, and when run, will check every 30 seconds whether or not the query returns any line. If no lines were returned by the query after 3600 seconds, the task will fail.

Since the default retry setup for the DAG is set to 3 retries and 5 minutes between retries, this process will be repeated up to 3 times before being marked as failed.

The {{ execution_date + macros.timedelta(days=1) }} part here is a Jinja template that will be rendered by Airflow at the DAG runtime (with a different value for execution_date for each DAG run). Note that execution date here is the beginning of the period of each DAG run, and not the actual start_dateof the execution. So in a daily DAG, it will be D-1, the date of the data to process, for a dag run triggered on date D. Henceforth, we add 1 day to check whether the period D-1 is actually complete.

It’s important to understand that the Python code we wrote is not meant to actually do any computations in itself, it’s merely the construction of a DAG object as a graph of Tasks. So no runtime context is available to the Python code here: you can’t expect the DAG creation code to know about the future execution dates of the DAG run it will spawn.

Also, note that we did not pass any credentials as to how the datbase instance shall be reached: we merely only pass the label "mini-warehouse-db” (which we defined earlier) value as a conn_id, which the SqlSensor operator use to retrieve a handle to the DB instance. This way we do not rely on knowing the environment specific host/port/credentials database setup, and can leverage Airflow’s connection pooling capabilities.

Now let’s add another task to this DAG:

This code defines a second task, that actually performs a basic data daily aggregation on the database. However, since the sensor is set as an upstream dependency for the task, it won’t run until the sensor actually succeeds for the specific DAG Run.

But let’s see how this actually flows once it gets picked up by the scheduler.

Cruise control

If we connect to the Airflow UI, we’ll end-up on the following board (if you kept the examples, you’ll have more DAGs in your scheduler):

Fig 2 — UI state after the discovery of our first DAG (de-activated by default)

Without getting into the details of the UI, we can see here that Airflow has already picked-up our DAG. That’s because Airflow scans the DAG folder periodically for changes and refreshes them dynamically. This allows for hot-deployment of DAGs without stopping the scheduler or any of the workers.

We can also see that, being new, the DAG is OFF by default, which means that the scheduler won’t queue any new task instances and won’t create any new DAG Runs. We want to rectify that, so let’s switch it on.

Instantly, the scheduler will start to perform what is called a back-fill: every run from the start date to now for which Airflow has no recollection of a run is triggered and queued. Contrary to cron, Airflow is aware of its past state, and can recall pervious runs (completion state, mertics, dates, logs…).

There are many parameters that can tune the behavior of the scheduler (which tasks should be run first ? How many in parallel ? Per task ? Per DAG ? Per resource consumed ? In total ?), but paraphrasing the documentation is beyond the scope of this article. Just know that all of these can be tuned either in the parameters of each DAG object, or globally in the airflow.cfg file.

We now should start to see a few executions pop-up on the dashboard:

Fig 3 — Our first DAG is on and failing “successfully” as expected

And if we click on the DAG’s name, we’ll get a more detailed interface and set of controls for this particular DAG:

Fig 4 — Historical view of our DAG’s back-fill instance and their task states

Here we can see, each dag run getting back-filled, from the dag `start_date`. The layout is visually very syntetic and should be read as follows:

  • On the first line, a paginated sequence of currently existing DAG runs, in chronological order, and their status (displayed as a color dot)
  • On each row, a task of the DAG currently being viewed, organised as a dependency tree
  • At each row/column intersection, a task instance, with a colored square to display the current state of the task instance.

The backfill is limited by the max concurrent DAG run settings, so only the first few days will actually have a DAG being run, but as some of them complete (either successfully or unsuccessfully), new DAG runs will pop-up until we catch-up to the current date.

Each task instance has a tooltip and can be clicked to see/manage the details of the tasks. The most frequent operations here are forcing a re-run of a failed task (and cascade to its upstream/downstream dependencies), or viewing the logs produced by each of the attempts.

Fig 5 — Viewing the log output for each run attempt of our first DAG task

Finally, the DAG’s graph can also be visualized directly, but this is mostly useful during the development phase, not so much during the operations of the DAG, even though it may help understand the global flow of tasks embedded in the DAG.

Fig 6 — Our mini two task DAG viewed as a graph

As can be seen on the tree view, our DAGs sensor task instances should currently either undergo retries, or have already failed. This is logical if you take a look at the SQL code executed of the log view of those tasks: the source table is empty, so no rows are returned.

So let’s add a few lines for those early dates and see what happens. We could do this directly on our mini Data Warehouse database, but we can also do it from Airflow, since the connection is actually registered as a usable resource.

Go to the data-profiling menu, and then open ad-hoc query. Then select our mini-warehouse-db from the menu, and execute those few lines of SQL:

Click run, and if all goes well, you should see the result of the last (SELECT) statement displayed in the UI:

Fig 6 — Result of our small transaction after execution from the Airflow Ad-Hoc-Query tool

Obviously, having the ability to run Read/Write queries on your Data Warehouse is something you will want to disallow or disable in your test/production environment, but for the purpose of this article, it illustrates Airflow’s versatility.

Now, if we get back to our DAG’s tree view, we can see that the DAG runs for the dates we’ve just injected in the mock database have most likely already successfully been completed.

Fig 7 — Our backfill tasks are beginning to succeed and so are our DAG runs

We can even check that our target table clickstream_aggregated has been loaded with the aggregates for those days, and only for those days: since the sensors for the next days are still waiting for the data to be available, and should fail in a few hours, according to our DAG specifications.

That’s it, we have a working DAG that performs some actual (fake) data processing!

Air Flaws

As much as we love Airflow, there are a couple drawbacks/shortcomings that you must keep in mind when implementing it:

  • No dynamic execution — The graph built by Airflow is built ahead of the actual execution. Airflow has a dynamic DAG generation system, which can rely on external parameters (configuration, or even Airflow variables), to alter the workflow’s graph. We use this pattern a lot, but it’s not possible to alter the shape of the workflow at runtime (for instance, spawn a variable number of tasks depending on the output of an upstream task).
  • No data lineage/management system — Airflow does not provide any data-management framework. It runs tasks, jobs, DAGs, might persist some state in its database, but it is not aware of the actual input/output of those operations. Things might change in the near future as an experimental data-lineage support is beginning to be integrated. Still, at Leboncoin we had to implement our own, and might have to refactor it in the near future to remove some of its limitations and facilitate our GDPRcompliance checks
  • There are sometimes puzzling failures in the UI when performing basic operations, where some of those might just refuse to work as expected, mostly due to some ORM magic not handling a specific edge case.
  • Scheduler’s priorities are not always obvious — When looking at the UI or CLI, it may be hard to determine which tasks will be picked by the scheduler, or why one specific task is not being triggered. There is little visualization to help deal with an overloaded scheduler, especially if you’re not using the CeleryExecutor.
  • No DAG execution versioning integration: DAGs can be refreshed dynamically, but you’ll always see the latest version. When looking back in time, you have to keep track of your deployment and match it with your VCS to assess which version has been run at a specific date.
  • The execution_date (core concept defining an airflow run) is the beginning of the data-processing period. So a daily dag triggered today has an execution_date set to yesterday. Not that obvious at first, but once you get your mind around it, you deal with it. We still find it quite annoying when producing daily executing DAGs with a sliding monthly window, but those are fortunately quite rare.

Aerial maneuvers

Like all rich-featured software, Airflow is quite easy to get into, but slow to master. Here are a couple of advice/best-practices we now consider core to the way we build data-pipelines with Airflow:

  • Idempotent and Transitive tasks: We now ensure that every pipeline we develop can be run several times, and in any order, without altering the consistency of the target data store. This might cost a little more effort at the conception stage (and is not always feasible), but this commitment is returned a hundredfold during operations, especially with a tool like Airflow where re-running a job on a patched dataset is just a press of a button away.
  • Understanding the dual level of execution model of Airflow: DAG definition code builds the DAG object, and have to be execution context agnostic: only the Operators actually execute any run-time computations.
  • Use Pools for every external resources (a database, a Spark cluster, an Elasticsearch cluster, an API endpoint), especially if those resources are outside of your responsibilities, and do not provide any rate limiting/query queuing capabilities. Remember that the offline world is toxic to the online world, and can have disastrous consequences for other services. Manage this toxicity, since you have the right tools to do it!
  • Centralize every parameter that you might want to change dynamically in Airflow’s Variables system. It can be encrypted, so you can (reasonably) store credentials in it.
  • Use XCOM (airflow’s system to transfer state from task to task) wisely. You want your tasks to remain as stateless and input independent as possible.
  • Monitor your scheduler and Airflow instance with external tools. Airflow is not a monitoring/alerting tool, it’s just a way to steer the complexity of your data pipelines, but is not meant to wake you up during the night when something critical fails. Use something like Datadog or Promotheus, route notifications for failures via slack, or whatever suits your existing processes.
  • Build your own custom atomic airflow Operators for every typology of task rather than rely on the far too tempting PythonOperator. Operators can be developed in a few minutes, and prevent execution logic from ending up in the DAG’s specification itself, keeping them more readable.
  • Use custom macros, as the out of the box set that airflow provides is likely to fall short of your requirements.
  • Don’t depend on the start-time of your tasks. Let the scheduler do its work, and put sensors in place where required. Don’t fine-tune your dag start_time to accommodate for an obscure business logic if you can properly wait for a signal (be it a state in a table, an API, a file deposit…)
  • Build Tests for everything you can. Sometime (in fact very often), when dealing with data-pipelines, you will hit a wall due to some resource that you cannot mock during integration testing. Postpone it to the pre-production environment then, but ensure that all the possible unit-tests have been implemented.

Keeping the horizon in sight

In the end, we are very satisfied with our current usage of Airflow. But there is still a long way to go. Our future endeavors will most likely focus on:

  • Transfer our knowledge and capabilities on Airflow to empower the Data Scientists of the company, providing better autonomy, automation and reliability for their own pipelines, and not just the production-critical tasks
  • Battle test Airflow for ML Pipelines in the same way we have performed data preparation/transformation pipelines
  • Investigate the cost and benefits of a switch to the Kubernetes Executor
  • Rethink our Dataset Management and Lineage system, perhaps using the work in progress made by Airflow on this issue
  • Enforce better definition and implementation of our data delivery quality, using airflow’s embedded SLA system
  • Open Source Contribution, since we have a (still somewhat company specific) set of operators that could benefit the community

--

--