Airflow : Zero to One

Neha Kumari
Analytics Vidhya
Published in
5 min readApr 5, 2020

In current world, we process a lot of data and the churn rate of it increases exponentially with passing time, where the data can belong to any of primary/inherited/captured/exhaust/structured/unstructured category(or intersection of them). We need to run multiple high performant data processing pipelines at a high frequency to gain maximum insight, do predictive analysis, and solve for other consumer needs. Managing our data pipelines via orchestrating, scheduling, monitoring becomes very critical task for the overall Data platform and its SLAs to be stable and reliable.

Let’s have a look at the several open-source orchestration system available to us -

  1. Luigi
  2. Azkaban
  3. Oozie
  4. Airflow

In this blog, we will go in detail about Airflow and how can we work with it to manage our data pipelines.

When workflows are defined as code, they become more maintainable, versionable, testable, and collaborative.

— Airflow documentation

Apache Airflow is a work-flow management system to programmatically author, schedule and monitor data pipelines. It has become the de-facto standard tool to orchestrate and schedule any kind of job, from machine learning model training to common ETL orchestration.

Airflow Architecture

Source : Google

Modes of Airflow setup

1. Standalone : under the standalone mode with a sequential executor, the executor picks up and runs jobs sequentially, which means there is no parallelism.

2. Pseudo-distributed : this runs with a local executor, the local workers pick up and run jobs locally via multiprocessing. This needs setup of mysql to interact with the meta data.

3. Distributed mode : this runs with a celery executor, remote workers pick up and run jobs as scheduled and load-balanced.

Get Airflow running in local

Here, we are going through the commands which needs to run in order to make Airflow locally up(standalone), one can choose to skip any step if that package already exist in the local system.

Installing Airflow and it’s dependencies

#airflow working directory
mkdir /path/to/dir/airflow
cd /path/to/dir/airflow
#install python and virtual env
brew install python3
pip install virtualenv
# activate virtual env
python3 -m venv venv
source venv/bin/activate
# to force a non GPL library(‘unidecode’)
export SLUGIFY_USES_TEXT_UNIDECODE=yes
# install airflow
export AIRFLOW_HOME=~/path/to/dir/airflow
pip install apache-airflow

Once we have installed Airflow, the default config is imported in the AIRFLOW_HOME and the folder structure looks like :

airflow/
├── airflow.cfg
└── unittests.cfg

airflow.cfg file contains the default value of configs, which are tweak-able to change the behaviour. Few of them, which are important and are probable to be updated are :

plugins_folder #path to Airflow plugins
dags_folder #path to dags code
executor #executor which Airflow uses
base_log_folder #path where Airflow logs should be stored
web_server_port #port on which the web server will run
sql_alchemy_conn #connection of metadata database
load_examples #load default example DAGs

Default value of configs can be found here.

Preparing the database

airflow initdb #create and initialise the Airflow SQLite database

SQLite is the default database for Airflow, and is an adequate solution for local testing and development, but it does not support concurrent access. SQLite is inherently made for single producer (write), multiple (but small number of) consumers (read). In a production environment we will certainly need to use a more robust database solution such as Postgres or MySQL. We can edit the config sql_alchemy_conn to access a MYSQL database with the required params.

airflow/
├── airflow.cfg
├── airflow.db (SQLite)
└── unittests.cfg

Running web server locally

To run web server, execute :

airflow webserver -p 8080

After running this, we will be able to see the Airflow web UI up and running at URL : http://localhost:8080/admin/

Here, as we can see in the error message “Scheduler does not appear to be running”, for Airflow to be fully functional let’s run the scheduler as well.

Running Scheduler

Run the scheduler for Airflow by command :

airflow scheduler

Now, if we visit the URL(http://localhost:8080/admin/), we can see our list of DAGs without any error.

Working with DAGs

To fetch the list of DAGs which are being managed by Airflow :

airflow list_dags #lists dags present in dags_folder

Let’s make a sample DAG and make it appear in web server UI.

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
def print_hello():
return "Hello world!"
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2020, 3, 30),
"email": ["abc@xyz.com"],
"email_on_failure": False,
"email_on_retry": False,
"retries": 1,
"retry_delay": timedelta(minutes=2),
}
dag = DAG(
"sample_code",
description="Sample DAG code",
schedule_interval="0 12 * * *",
default_args=default_args,
catchup=False,
)
t1 = DummyOperator(task_id="dummy_task", retries=3, dag=dag)t2 = PythonOperator(task_id="hello_task", python_callable=print_hello, dag=dag)# sets the order of execution of tasks
t1 >> t2

We can also test the tasks of the DAG via command line :

airflow list_tasks sample_code --tree #lists tasks of the DAG
airflow test sample_code hello_task 2020-04-02 #prints Hello world!

To reflect the new DAG on web server UI, we can run any of the following command :

airflow initdb 
airflow upgradedb

Here, we are able to see the changes being reflected in web server UI. Now, if we go back and check the AIRFLOW_HOME folder, it will have the structure like :

airflow/
├── airflow-scheduler.err
├── airflow-scheduler.log
├── airflow-scheduler.out
├── airflow-webserver.err
├── airflow-webserver.log
├── airflow-webserver.out
├── airflow.cfg
├── airflow.db
├── dags
│ ├── __pycache__
│ │ └── sample_dag.cpython-37.pyc
│ └── sample_dag.py
├── logs
│ ├── dag_processor_manager
│ │ └── dag_processor_manager.log
│ └── scheduler
│ ├── 2020-03-29
│ │ └── sample_dag.py.log
│ ├── 2020-03-30
│ │ └── sample_dag.py.log
│ └── latest -> /path/to/dir/airflow/logs/scheduler/2020-03-30
├── unittests.cfg

To debug or see log of our DAG we can tail the log file of corresponding DAG, for e.g. :

tail -f airflow/logs/scheduler/2020-03-30/sample_dag.py.log

This above discussion was around basics of Airflow and how we can setup Airflow in local for our day to day development and testing. Depending on the use case we can choose any of the orchestration system to manage our data pipelines to give a clear sense of data lineage and visibility. Concepts of another such orchestrator, Oozie, is discussed in this blog.

--

--