Airflow Part 2: Lessons learned

Nehil Jain
Jun 17, 2018 · 7 min read

At SnapTravel we use Apache Airflow to orchestrate our batch processes. It is one of the key systems we depend on for keeping track of business metrics, building/testing natural language models, and mining for product insights. After working with Airflow for almost 2 years now, we wanted to consolidate our lessons and share the nuances with other developers.

To better understand our early overview of how Airflow was implemented at SnapTravel, see our prior blog post. Here we discussed the benefits of Airflow, the major components of Airflow architecture and a bunch of resources for those just getting started. In this post we assume the reader is someone who is familiar with concepts like Operator (like PythonOperator), Scheduler, DAGs, and Tasks. If not, give the Airflow Tutorial a quick read!

This post focuses on the Develop part of data pipeline using Airflow. We’ll explore some learnings that consumed more than a couple of hours during our time writing DAGs, along with some best practices picked up along the way. These are quirks/unexpected behaviours of Airflow which might cost you a couple of hours of debugging. Airflow is incubating under the Apache umbrella and being actively improved… our hope is that some of these problems will not be relevant in the upcoming releases!

Python Version for my project — py3 or py2?

Airflow is written for Python 3 compatibility. It is a smooth ride if you can write your business logic in Python 3 as compared to Python 2.x. There are unexpected behaviours at runtime which are hard to anticipate. For example, after you `import airflow` in your code, some of the Python 2 functions are overwritten to Python 3 counterparts as described in Python Future Library Docs. The file in Airflow codebase where this happens is airflow/ Below is a code example to demonstrate this:

Python 2.7.12 Shell>>> import math>>> type(math.ceil(1.0))<type ‘float’>>>> from airflow.models import DAG>>> type(math.ceil(1.0))<type ‘int’>

How to create different patterns found in workflows

Sequential source to destination pattern

In this pattern, you have a sequence of tasks being executed in order. For example, `source` is file on FTP, `b` is some transform or join with data from the database and `c` is target destination.

source = DummyOperator(task_id=’source’, dag=dag)
a_task = DummyOperator(task_id=’a’, dag=dag)
b_task = DummyOperator(task_id=’b’, dag=dag)
source >> a_task >> b_task

Note: The shorthand to specify task dependency. `>>` and `<<` bitshift operators can be used in airflow 1.8+ versions. (Quick reference)

Tributaries pattern

This is a pattern where multiple data streams/sources are combined together. `d` will only execute after `a`, `b`, `c` have executed/triggered successfully. Example, `a` task generates data for hotel catalogue from Expedia, `b` generates data for hotel catalogue from Priceline, `c` generates hotel and review data from Tripadvisor and `d` combines the data from these 2 source tables to generate a new table of all encompassing hotel catalogue.

a_task = DummyOperator(task_id=’a’, dag=dag)
b_task = DummyOperator(task_id=’b’, dag=dag)
c_task = DummyOperator(task_id=’c’, dag=dag)
d_task = DummyOperator(task_id=’d’, dag=dag)
e_task = DummyOperator(task_id=’e’, dag=dag)
d_task.set_upstream([a_task, b_task, c_task])

Distributaries pattern

This pattern is opposite to the the one above, it is used to push data to multiple branches from a single source. For example, continuing above example, `b` is finalised hotel catalogue data, `e` can be generate SEO related index, `g` can be to generate summary stats of our catalogue, `c` can be to generate elasticsearch index using new data for autocomplete features.

a_task = DummyOperator(task_id=’a’, dag=dag)
b_task = DummyOperator(task_id=’b’, dag=dag)
c_task = DummyOperator(task_id=’c’, dag=dag)
d_task = DummyOperator(task_id=’d’, dag=dag)
e_task = DummyOperator(task_id=’e’, dag=dag)
f_task = DummyOperator(task_id=’f’, dag=dag)
g_task = DummyOperator(task_id=’g’, dag=dag)
h_task = DummyOperator(task_id=’h’, dag=dag)
b_task.set_downstream([c_task, e_task, g_task])

Use Hooks for all data sources!

s3_hook = S3Hook(aws_conn_id=’S3_test’)`

For some examples of how to write hooks and what is available out there:

How to automatically trigger my DAG when the data arrives on the server?

Another very useful concept to adopt is Sensors. Sensors are a special kind of airflow operator that will keep running until a certain criterion is met. For example, you know a file will arrive at your S3 bucket during certain time period, but the exact time when the file arrives is inconsistent. In this case, you can use `S3KeySensor` to wait for the key to be present in a S3 bucket.

Don’t Repeat Yourself!!

Previous DAG using `S3KeySensor` is limited because you need to hard code the S3 key into the code. Because S3 is so versatile, you will end up writing multiple sensors waiting for data on S3. However, if you dig into the implementation of `S3KeySensor`, you can find two fields are templates `template_fields = (‘bucket_key’, ‘bucket_name’)`. Here you can use a powerful jinja template!! Let us give you an example:

The templates, i.e. file_suffix in the above example, will get templated by the Airflow engine sometime between __init__ and execute of the dag. Airflow has some useful macros built in, you can refer to Macros Documentation.

Should start date be dynamic or static?

1. Start date should be important for DAGs that depends on previous runs.

2. For DAGs that affect the entire dataset and does not depend on history, the start date should depict date of latest deployment.

3. Avoid changing start date, name or execution time/schedule intervals of an active DAG. It is better to create a new version of the DAG with the changes.

Two relevant excerpts from Apache Airflow wiki are:

- Using a start_date of `` can lead to unpredictable behavior, and your DAG never starting. It’s recommended to subtract a timespan to force the scheduler to recognize the start_date.

- When setting a schedule, align the start date with the schedule. If a schedule is to run at 2am UTC, the start-date should also be at 2am UTC

Airflow Wiki: Common Pitfalls

Are DAGs configuration only or can they have logic?

Also, another caveat to lookout for is accessing/querying external systems for config/information in a dag. For example, if you query Consul for a configuration and it can timeout, which can cause your Dag to timeout and never be scheduled.

Dags are hard to test. Because of this, we decided to keep business logic away from the airflow DAG configuration which should just import python modules and execute them based on the dependency graph defined. This will allow you to have high test coverage on your business code.

Dags: larger or smaller?

And there you have it, these are some of the main hurdles that we jumped through when working with Airflow at SnapTravel over the last 2 years. We love digging into Airflow’s code, as it is important to have decent understanding of the inner working of Airflow Metadatabase, scheduler and worker. This comes in handy when you are trying to debug issues like, scheduler hang, jobs not triggering. Also CLI interface is very helpful to test new DAGs, backfill historical data when you refactor/change business logic for a DAG. Next, we will be coming out with a blog post on the topic of testing Data and Data Pipelines.

Thank you to our interns Shang Gao and Tomi Sami for your help bringing this blog to life! 🙏

Have any questions or thoughts? Let us know in the comments below! 👇


Content from the Snaptravel team.

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store