Airflow Part 2: Lessons learned

At SnapTravel we use Apache Airflow to orchestrate our batch processes. It is one of the key systems we depend on for keeping track of business metrics, building/testing natural language models, and mining for product insights. After working with Airflow for almost 2 years now, we wanted to consolidate our lessons and share the nuances with other developers.

To better understand our early overview of how Airflow was implemented at SnapTravel, see our prior blog post. Here we discussed the benefits of Airflow, the major components of Airflow architecture and a bunch of resources for those just getting started. In this post we assume the reader is someone who is familiar with concepts like Operator (like PythonOperator), Scheduler, DAGs, and Tasks. If not, give the Airflow Tutorial a quick read!

This post focuses on the Develop part of data pipeline using Airflow. We’ll explore some learnings that consumed more than a couple of hours during our time writing DAGs, along with some best practices picked up along the way. These are quirks/unexpected behaviours of Airflow which might cost you a couple of hours of debugging. Airflow is incubating under the Apache umbrella and being actively improved… our hope is that some of these problems will not be relevant in the upcoming releases!

Python Version for my project — py3 or py2?

Airflow is written for Python 3 compatibility. It is a smooth ride if you can write your business logic in Python 3 as compared to Python 2.x. There are unexpected behaviours at runtime which are hard to anticipate. For example, after you `import airflow` in your code, some of the Python 2 functions are overwritten to Python 3 counterparts as described in Python Future Library Docs. The file in Airflow codebase where this happens is airflow/ Below is a code example to demonstrate this:

Python 2.7.12 Shell
>>> import math
>>> type(math.ceil(1.0))
<type ‘float’>
>>> from airflow.models import DAG
>>> type(math.ceil(1.0))
<type ‘int’>

How to create different patterns found in workflows

There are various types of DAG’s that are common while authoring data workflows. Here are some code snippets to facilitate writing DAG’s easily. We took slides from Slides for developing elegant workflows with Apache Airflow @ Europython 2017 for inspiration.

Sequential source to destination pattern

In this pattern, you have a sequence of tasks being executed in order. For example, `source` is file on FTP, `b` is some transform or join with data from the database and `c` is target destination.

source = DummyOperator(task_id=’source’, dag=dag)
a_task = DummyOperator(task_id=’a’, dag=dag)
b_task = DummyOperator(task_id=’b’, dag=dag)
source >> a_task >> b_task

Note: The shorthand to specify task dependency. `>>` and `<<` bitshift operators can be used in airflow 1.8+ versions. (Quick reference)

Tributaries pattern

This is a pattern where multiple data streams/sources are combined together. `d` will only execute after `a`, `b`, `c` have executed/triggered successfully. Example, `a` task generates data for hotel catalogue from Expedia, `b` generates data for hotel catalogue from Priceline, `c` generates hotel and review data from Tripadvisor and `d` combines the data from these 2 source tables to generate a new table of all encompassing hotel catalogue.

a_task = DummyOperator(task_id=’a’, dag=dag)
b_task = DummyOperator(task_id=’b’, dag=dag)
c_task = DummyOperator(task_id=’c’, dag=dag)
d_task = DummyOperator(task_id=’d’, dag=dag)
e_task = DummyOperator(task_id=’e’, dag=dag)
d_task.set_upstream([a_task, b_task, c_task])

Distributaries pattern

This pattern is opposite to the the one above, it is used to push data to multiple branches from a single source. For example, continuing above example, `b` is finalised hotel catalogue data, `e` can be generate SEO related index, `g` can be to generate summary stats of our catalogue, `c` can be to generate elasticsearch index using new data for autocomplete features.

a_task = DummyOperator(task_id=’a’, dag=dag)
b_task = DummyOperator(task_id=’b’, dag=dag)
c_task = DummyOperator(task_id=’c’, dag=dag)
d_task = DummyOperator(task_id=’d’, dag=dag)
e_task = DummyOperator(task_id=’e’, dag=dag)
f_task = DummyOperator(task_id=’f’, dag=dag)
g_task = DummyOperator(task_id=’g’, dag=dag)
h_task = DummyOperator(task_id=’h’, dag=dag)
b_task.set_downstream([c_task, e_task, g_task])

Use Hooks for all data sources!

Our first version of DAG’s used `BashOperator` and `PythonOperator` everywhere. What that entailed was implementing different functions ourselves to transfer files, check for new data etc. from various data storage/server systems. Airflow had already done the heavy lifting for us by providing us with Hooks. Hooks are meant as an interface to interact with external systems, like S3, HIVE, SFTP, databases etc. Most important functions are implemented for us get_conn(), retrieve_files(), run(), which is responsible for establishing connection to the external systems, get files/data to local system and/or run queries.

s3_hook = S3Hook(aws_conn_id=’S3_test’)`

For some examples of how to write hooks and what is available out there:

How to automatically trigger my DAG when the data arrives on the server?

Another very useful concept to adopt is Sensors. Sensors are a special kind of airflow operator that will keep running until a certain criterion is met. For example, you know a file will arrive at your S3 bucket during certain time period, but the exact time when the file arrives is inconsistent. In this case, you can use `S3KeySensor` to wait for the key to be present in a S3 bucket.

Don’t Repeat Yourself!!

Previous DAG using `S3KeySensor` is limited because you need to hard code the S3 key into the code. Because S3 is so versatile, you will end up writing multiple sensors waiting for data on S3. However, if you dig into the implementation of `S3KeySensor`, you can find two fields are templates `template_fields = (‘bucket_key’, ‘bucket_name’)`. Here you can use a powerful jinja template!! Let us give you an example:

The templates, i.e. file_suffix in the above example, will get templated by the Airflow engine sometime between __init__ and execute of the dag. Airflow has some useful macros built in, you can refer to Macros Documentation.

Should start date be dynamic or static?

We started creating DAGs with static dates as prescribed by Updates to Airflow. This led to nonsensical start dates in our days, over time like start_date: datetime(2016, 3, 20) but reading more examples with days_ago made us realise the benefits and how start dates work. The main concept to note is that Dag starts executing at `start_date + schedule_interval`. We decided to follow a few conventions in our team for configurations for DAGs.

1. Start date should be important for DAGs that depends on previous runs.

2. For DAGs that affect the entire dataset and does not depend on history, the start date should depict date of latest deployment.

3. Avoid changing start date, name or execution time/schedule intervals of an active DAG. It is better to create a new version of the DAG with the changes.

Two relevant excerpts from Apache Airflow wiki are:

- Using a start_date of `` can lead to unpredictable behavior, and your DAG never starting. It’s recommended to subtract a timespan to force the scheduler to recognize the start_date.
- When setting a schedule, align the start date with the schedule. If a schedule is to run at 2am UTC, the start-date should also be at 2am UTC
Airflow Wiki: Common Pitfalls

Are DAGs configuration only or can they have logic?

Airflow is not a highly opinionated framework in my opinion. One of the questions we had to ask ourselves was: is it okay to have DAGs with business logic in them or should they only be code to configure the pipeline? Dags are supposed to be “Configuration as code”. If you increase the scope of a dag to do more than that there can be timeout issues when scheduler creates the DagBag Object.

Also, another caveat to lookout for is accessing/querying external systems for config/information in a dag. For example, if you query Consul for a configuration and it can timeout, which can cause your Dag to timeout and never be scheduled.

Dags are hard to test. Because of this, we decided to keep business logic away from the airflow DAG configuration which should just import python modules and execute them based on the dependency graph defined. This will allow you to have high test coverage on your business code.

Dags: larger or smaller?

It’s recommended to keep Dags as small as possible. The idea is that DAG should only contain tasks that truly depend on each other. Tasks that are similar in nature but dont depend on each other should be kept in separate DAGs as they allow small blocks of work and are easily extensible. Linking unrelated tasks is technical debt that can be painful to pay off later on.

And there you have it, these are some of the main hurdles that we jumped through when working with Airflow at SnapTravel over the last 2 years. We love digging into Airflow’s code, as it is important to have decent understanding of the inner working of Airflow Metadatabase, scheduler and worker. This comes in handy when you are trying to debug issues like, scheduler hang, jobs not triggering. Also CLI interface is very helpful to test new DAGs, backfill historical data when you refactor/change business logic for a DAG. Next, we will be coming out with a blog post on the topic of testing Data and Data Pipelines.

Thank you to our interns Shang Gao and Tomi Sami for your help bringing this blog to life! 🙏

Have any questions or thoughts? Let us know in the comments below! 👇