Apache Airflow for data pipeline orchestration

Parag Bhosale
Algologic.ai

--

At Algologic.ai, we collect trading data from exchanges like Bitfinex, Hitbtc, Binance and others, for algorithmic trading. This collection of data requires multiple stages; extract, transform and load or simply ETL. These steps can have dependencies on one or more stages. Some stages have dependencies on the time, i.e., these stages need to be executed at a particular time(scheduling). Since there are multiple exchanges to collect data from, we were looking for not only a better monitoring and alerting framework but also efficient and programmer friendly framework for orchestration of complex data pipelines. In this quest, we came across Apache airflow.

Apache airflow is an incubation project of Apache Software Foundation, defined as:

Airflow is a platform to programmatically author, schedule and monitor workflows.

The main parts of airflow are python package, web server, scheduler and backend database. There are also others important parts, but in this article, we will cover only above parts.

Python package

As a python programmer, the most convenient part I found about airflow is its compatibility with python code. We can code our data pipelines with python scripts. Airflow script consists of two main components, directed acyclic graph (dag) and task. In simple terms, a dag is a directed graph consist of one or more tasks.

We need to import few packages for our workflow

from airflow import DAG
from airflow.operators.bash_operator import BashOperator

Now, we create a dag which will run at 00:15 hours everyday. The important arguments are dag_ig, schedule_interval and start_date. start_date should be static. If we consider following, the first dag run will be triggered after the start_date and at the end of the schedule_interval. Since schedule interval is daily, following the first dag run will be triggered at 2018–9–28 00:15 AM. Refer to this for more details.

default_args = {
'owner': 'airflow',
'depends_on_past': True,
'start_date': datetime(2018, 9, 27),
'retries': 1,
'retry_delay': timedelta(seconds=10),
}
dag = DAG(dag_id='data_flow', default_args=default_args, max_active_runs=1, concurrency=1,schedule_interval='15 0 * * *')

It is time to add extract task shown in the snippet to the dag. We use BashOperator from airflow package to run a bash command. Bash command can be anything from ls to running a python script. (Note: dag id and tasks should be unique among all dags.). We have used on_failure_callback to run a method if the current task fails. This callback method sends notifications to a slack channel so we can monitor if a task crashes.

extract = BashOperator(task_id='taksid_extract',
depends_on_past=True,
wait_for_downstream=True,
on_failure_callback=slack_failed_task,
bash_command="bash command",
dag=dag)

Similarly, we add two more tasks, transform and load to the dag. We have created tasks related to the stages we discussed earlier. As the extract task needs to be completed first followed by transform and load, we need set dependencies among these tasks as follow.

extract.set_downstream(transform)
transform.set_downstream(load)

Web server

Airflow web server provides easy graphical interface access to users. Users can access information about dag and tasks such as the status of a dag/task, execution time, logs, recent runs, etc. We can also run a dag manually from the webserver. I highly recommend beginners should to go through the details regarding the webserver here.

Scheduler

The scheduler is the main core of airflow. It keeps the track of pending/ incoming tasks of dags. It periodically checks if a task is needed to be executed as per the schedule. I would not recommend beginners to go in details about airflow scheduler.

Database

Apache airflow needs a backend database where airflow keeps the records about status and information about dag, tasks, users, connections, etc. By default, airflow uses SQLite database. For advanced uses, better databases like PostgreSQL is highly recommended.

It is easy to create similar dags for different exchanges using the above approach with all dags and tasks will have unique ids. It is better to have different scheduling intervals for different dags if airflow is running on fewer CPU cores machine and tasks are computationally intensive.

Hope this article give you a brief overview of the Airflow’s functionalities.

--

--