How To Write Your First Pipeline in Airflow
Automating your first workflow with Python
This tutorial discusses the basic steps it takes to create your first workflow using Apache Airflow.
Before getting started, you need to set up an Airflow environment to be able to follow along with each of the steps discussed in this article. If you haven’t already done that, we find this article to be one of our personal favorites.
You might be asking why use Airflow anyway? Airflow helps solve a lot of issues by automating workflows and managing boring and redundant manual tasks.
By definition, Apache Airflow is a platform to programmatically author, schedule, and monitor workflows, also known as DAGs (see Github).
You can use Airflow to write ETLs, machine learning pipelines, and general job scheduling (e.g., Cron), including database backups and scheduling of code/config deployment.
We discussed some of the benefits of using Airflow in our comparison of Airflow and Luigi.
Understanding Airflow Pipelines
An Airflow pipeline is essentially a set of parameters written in Python that define an Airflow Directed Acyclic Graph (DAG) object. Various tasks within a workflow form a graph, which is Directed because the tasks are ordered. To avoid getting stuck in an infinite loop, this graph does not have any cycles, hence Acyclic.
For example, if we had three tasks named
FooBar, it might be the case that
Foo runs first and
FooBar depend on
This would create a basic graph like the one below. As you can see, there’s a clear path. Now imagine this with tens of hundreds of tasks. Having a clear structure for how those tasks are run and what the order is is important.
With that basic explanation out of the way, let’s create your first DAG.
If you followed the link above for setting up Airflow, then you should have set up a directory that points the
AIRFLOW_HOME variable to a folder. By default, this should be a folder called airflow. In that folder, you will need to create a DAGs folder. You want to create your first DAG in the DAGs folder, as below.
Breaking this down, we will need to set up a Python dictionary containing all the arguments applied to all the tasks in your workflow. If you take a look in the code below, there are some basic arguments, including
owner (basically just the name of the DAG owner), and
start_date of the task (determines the execution day of the first DAG task instant)
Airflow is built to handle both incremental and historical runs. Sometimes you just don’t want to schedule the workflow and just run the task for today. You may also want to start running tasks from a specific day in the past (e.g., one day ago,) which is what is set up in the first code snippet below.
In this case, the
start_date is one day ago. Your first DAG will run yesterday’s data, then any day after that.
Here are some other key parameters.
end_datein the code will determine the last execution date. Specifying an end date limits Airflow from going beyond the date. If you don’t put in this end date, then Airflow will just keep running forever.
depends_on_pastis a Boolean value. If you set it to true, the current running test instance will rely on the previous task’s status. For example, suppose you set this argument to true, in this case, a daily workflow. If yesterday’s task run failed, then a two-day task will not be triggered because it depends on the status of the previous date.
email on failureis used to define whether you want to receive the notification if a failure happens.
email on retryis used to define whether you want to receive an email every time a retry happens.
retriesdictates the number of times Airflow will attempt to retry a failed task
retry-delayis the duration between consecutive retries.
In the example, Airflow will retry once every five minutes.
A quality workflow should be able to alert/report on failures, and this is one of the key things we aim to achieve in this step. Airflow is specially designed to simplify coding in this area. This is where emailing on failures can be helpful.
Configure DAG Schedule
This step is about instantiating a DAG by giving it a name and passing in the default argument to your DAG here:
Then set the schedule interval to specify how often DAG should be triggered and executed. In this case, it is just once per day.
Below is one way you can set up your DAG.
If you want to run your schedule daily, then use the following code parameters:
schedule_interval=’@daily’. Or you can use cron instead, like this:
schedule_interval=’0 0 * * *’.
Lay Out All the Tasks
In the example below, we have three tasks using the
These tasks are all pretty straightforward. What you will notice is that each has a different function and requires different parameters.
DummyOperator is just a blank operator you can use to create a step that doesn’t really do anything except signify the pipeline is done.
PythonOperator allows you to call a Python function and even pass it parameters.
BashOperator allows you to call bash commands.
Below we will just be writing the tasks. This will not operate until you add all the pieces together.
Using these basic tasks, you can now start to define dependencies, the orders in which the tasks should be executed.
There are two ways to define the dependencies among the tasks.
The first way is to use
set_upstream. In this case, you can use
set_upstream to make the
python_task depend on the BASH task or do the same with the downstream version.
Using this basic set up, if the BASH task is successful, then the Python task will run. Similarly, the
dummy_task is dependent on the BASH task finishing.
The second way you can define a dependency is by using the bit shift operator. For those unfamiliar with the bit shift operator, it looks like >> or <<.
For example, if you would like to reference the Python task being dependent on the BASH task, you could write it as
bashtask >> python_task.
Now what if you have a few tasks dependent on one?
Then you can put them as a list. In this case, the Python task and
dummy_task both depend on the BASH task and are executed in parallel following the completion of the BASH task. You can use either the
set_downstream method or the bit shift operator.
Your First Airflow Pipeline
Now that we have gone over each of the different pieces, we can put it all together. Below is your first basic Airflow pipeline.
Adding the DAG Airflow Scheduler
Assuming you already have initialized your Airflow database, then you can use the webserver to add in your new DAG. Using the following commands, you can add in your pipeline.
> airflow webserver
> airflow scheduler
The end result will appear on your Airflow dashboard as below.
Thanks for reading and good luck creating your future pipelines!