How To Write Your First Pipeline in Airflow

Automating your first workflow with Python

Dec 2 · 6 min read
Photo by Crystal Kwok on Unsplash

This tutorial discusses the basic steps it takes to create your first workflow using Apache Airflow.

Before getting started, you need to set up an Airflow environment to be able to follow along with each of the steps discussed in this article. If you haven’t already done that, we find this article to be one of our personal favorites.

Why Airflow?

You might be asking why use Airflow anyway? Airflow helps solve a lot of issues by automating workflows and managing boring and redundant manual tasks.

By definition, Apache Airflow is a platform to programmatically author, schedule, and monitor workflows, also known as DAGs (see Github).

You can use Airflow to write ETLs, machine learning pipelines, and general job scheduling (e.g., Cron), including database backups and scheduling of code/config deployment.

We discussed some of the benefits of using Airflow in our comparison of Airflow and Luigi.

Understanding Airflow Pipelines

An Airflow pipeline is essentially a set of parameters written in Python that define an Airflow Directed Acyclic Graph (DAG) object. Various tasks within a workflow form a graph, which is Directed because the tasks are ordered. To avoid getting stuck in an infinite loop, this graph does not have any cycles, hence Acyclic.

For example, if we had three tasks named Foo, Bar, and FooBar, it might be the case that Foo runs first and Bar and FooBar depend on Foo finishing.

This would create a basic graph like the one below. As you can see, there’s a clear path. Now imagine this with tens of hundreds of tasks. Having a clear structure for how those tasks are run and what the order is is important.

With that basic explanation out of the way, let’s create your first DAG.

If you followed the link above for setting up Airflow, then you should have set up a directory that points the AIRFLOW_HOME variable to a folder. By default, this should be a folder called airflow. In that folder, you will need to create a DAGs folder. You want to create your first DAG in the DAGs folder, as below.

Set default_args

Breaking this down, we will need to set up a Python dictionary containing all the arguments applied to all the tasks in your workflow. If you take a look in the code below, there are some basic arguments, including owner (basically just the name of the DAG owner), and start_date of the task (determines the execution day of the first DAG task instant)

Airflow is built to handle both incremental and historical runs. Sometimes you just don’t want to schedule the workflow and just run the task for today. You may also want to start running tasks from a specific day in the past (e.g., one day ago,) which is what is set up in the first code snippet below.

In this case, the start_date is one day ago. Your first DAG will run yesterday’s data, then any day after that.

Here are some other key parameters.

In the example, Airflow will retry once every five minutes.

A quality workflow should be able to alert/report on failures, and this is one of the key things we aim to achieve in this step. Airflow is specially designed to simplify coding in this area. This is where emailing on failures can be helpful.

Configure DAG Schedule

This step is about instantiating a DAG by giving it a name and passing in the default argument to your DAG here: default_args=default_args.

Then set the schedule interval to specify how often DAG should be triggered and executed. In this case, it is just once per day.

Below is one way you can set up your DAG.

If you want to run your schedule daily, then use the following code parameters: schedule_interval=’@daily’. Or you can use cron instead, like this: schedule_interval=’0 0 * * *’.

Lay Out All the Tasks

In the example below, we have three tasks using the PythonOperator, DummyOperator, and BashOperator.

These tasks are all pretty straightforward. What you will notice is that each has a different function and requires different parameters.

The DummyOperator is just a blank operator you can use to create a step that doesn’t really do anything except signify the pipeline is done.

The PythonOperator allows you to call a Python function and even pass it parameters.

The BashOperator allows you to call bash commands.

Below we will just be writing the tasks. This will not operate until you add all the pieces together.

Using these basic tasks, you can now start to define dependencies, the orders in which the tasks should be executed.

Define Dependencies

There are two ways to define the dependencies among the tasks.

The first way is to use set_downstream and set_upstream. In this case, you can use set_upstream to make the python_task depend on the BASH task or do the same with the downstream version.

Using this basic set up, if the BASH task is successful, then the Python task will run. Similarly, the dummy_task is dependent on the BASH task finishing.

The second way you can define a dependency is by using the bit shift operator. For those unfamiliar with the bit shift operator, it looks like >> or <<.

For example, if you would like to reference the Python task being dependent on the BASH task, you could write it as bashtask >> python_task.

Now what if you have a few tasks dependent on one?

Then you can put them as a list. In this case, the Python task and dummy_task both depend on the BASH task and are executed in parallel following the completion of the BASH task. You can use either the set_downstream method or the bit shift operator.

bashtask.set_downstream([python_task, dummy_task])

Your First Airflow Pipeline

Now that we have gone over each of the different pieces, we can put it all together. Below is your first basic Airflow pipeline.

Adding the DAG Airflow Scheduler

Assuming you already have initialized your Airflow database, then you can use the webserver to add in your new DAG. Using the following commands, you can add in your pipeline.

> airflow webserver
> airflow scheduler

The end result will appear on your Airflow dashboard as below.

Thanks for reading and good luck creating your future pipelines!

Better Programming

Advice for programmers.


Written by

#Data #Engineer, Strategy Development Consultant and All Around Data Guy #deeplearning #machinelearning #datascience #tech #management

Better Programming

Advice for programmers.

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade