Apache Airflow: A Real-life Use Case

In this post, I will guide you through how to write an airflow scheduler for a practical use case.

Bào Bùi
Analytics Vidhya
5 min readAug 30, 2019

--

Overview

Airflow is simply a tool for us to programmatically schedule and monitor our workflows. When dealing with complicate pipelines, in which many parts depend on each other, using Airflow can help us to write a clean scheduler in Python along with WebUI to visualize pipelines, monitor progress and troubleshoot issues when needed.

Real-life Example

The best way to comprehend the power of Airflow is to write a simple pipeline scheduler. A common use case for Airflow is to periodically check current file directories and run bash jobs based on those directories. In this post, I will write an Airflow scheduler that checks HDFS directories and run simple bash jobs according to the existing HDFS files. The high-level pipeline can be illustrated as below:

Pipeline Overview

As you can see, first we will try to check the today dir1 and dir2, if one of them does not exist (due to some failed jobs, corrupted data…) we will get the yesterday directory. We also have a rule for job2 and job3, they are dependent on job1. So if job1 fails, the expected outcome is that both job2 and job3 should also fail. This is one of the common pipeline pattern that can be easily done when using Airflow.

Setting Up

There are a lot of good source for Airflow installation and troubleshooting. Here, I just briefly show you how to set up Airflow on your local machine.

Installing Airflow using pip:

pip install apache-airflow

Initialize Airflow database:

airflow initdb

Start the webserver:

airflow webserver -p 8080

Run the scheduler:

airflow scheduler

If all run successfully, you can check out Airflow UI via: http://localhost:8080/

OK, let’s write it!

First we need to define a set of default parameters that our pipeline will use. Since our pipeline needs to check directory 1 and directory 2 we also need to specify those variables. Fortunately,

Airflow leverages the power of Jinja Templating and provides the pipeline author with a set of built-in parameters and macros. Airflow also provides hooks for the pipeline author to define their own parameters, macros and templates.

This is how we define it:

Define default parameters

Here is the brief description for each parameter:

  • owner: Owner of the pipeline, this will be shown on the webUI.
  • depends_on_past: Whether or not this pipeline will be dependent on the past pipeline instance. So if your past pipeline had failed, this current pipeline will not be triggered.
  • start_date: The start date of your pipeline.
  • email: The email to notify.
  • email_on_failure: Specify the email that will be notified when your pipeline fails.
  • email_on_retry: Whether or not to notify email when retries.
  • retries: Number of retries.
  • catchup_by_default: Whether or not to run the all previous scheduled pipelines if you start date is from the past.
  • params: User-defined parameters for this pipeline, this will be accessed by Jinja template {{ params.my_param }}.

As for build_params functions, this function just loads the user-defined variable from yml file. This is a good practice to load variables from yml file:

Read user-defined variables from yml file

Since we need to decide whether to use the today directory or yesterday directory, we need to specify two variables (one for yesterday, one for today) for each directory. The yml file for the function to load from is simple:

yml file

After specifying the default parameters, we create our pipeline instance to schedule our tasks. In Airflow terminology, we call it DAG:

A DAG – or a Directed Acyclic Graph – is a collection of all the tasks you want to run, organized in a way that reflects their relationships and dependencies.

This is how we define it:

Define DAG

Here we want to schedule the DAG to run daily by using schedule_interval parameter.

Next we write how each of the job will be executed. Specifically, we want to write 2 bash jobs to check the HDFS directories and 3 bash jobs to run job1, job2 and job3. Here, the bash jobs are just simple commands but we can arbitrarily create more complicated jobs:

Bash job functions

Since we want to pass the checked directories to job1, we need some way to cross-communicate between operators. Luckily, Airflow does provide us feature for operator cross-communication, which is called XCom:

XComs let tasks exchange messages, allowing more nuanced forms of control and shared state. The name is an abbreviation of “cross-communication”. XComs are principally defined by a key, value, and timestamp, but also track attributes like the task/DAG that created the XCom and when it should become visible. Any object that can be pickled can be used as an XCom value, so users should make sure to use objects of appropriate size.

Here in check_dir1 and check_dir2 functions, we echo the directories for job1, we can get those directories by using this Jinja syntax:

{{ ti.xcom_pull(task_ids='Your task ID here') }}

The last thing we need to do is to instantiate airflow jobs and specify the order and dependency for each job:

The syntax [A, B] >> C means that C will need to wait for A and B to finish before running. Another way you can write this is to use set_downstream function: A.set_downstream(B) means that A needs to finish before B can run.

Each of the bash job instance has a trigger rule, which specifies a condition required for this job to run, in this code we use 2 types of trigger rule:

  • all_done: All previous operations have finished working no matter they are successful or not.
  • all_success: All previous operations have finished successfully.

After you have created the whole pipeline, all you need to do is just start this scheduler:

python scheduler_demo.py

Note: The default DAG directory is ~/airflow/dags/. So all of your code should be in this folder.

You need to wait a couple of minutes and then log into http://localhost:8080/ to see your scheduler pipeline:

Airflow UI

You can manually trigger the DAG by clicking the play icon. You can also monitor your scheduler process, just click on one of the circles in the DAG Runs section:

Click DAG Runs section

After clicking on a process in DAG Runs, the pipeline process will appear:

Scheduler process

This indicates that the whole pipeline has successfully run.

This is how you can create a simple Airflow pipeline scheduler. The whole script can be found in this repo. Thank you for reading till the end, this is my first post in Medium, so any feedback is welcome!

--

--

Bào Bùi
Analytics Vidhya

Machine Learning Engineer at Meta. Write about AI, recommendation. Linkedin profile: https://www.linkedin.com/in/bao-bui-164b94106/