Building Data Pipeline with Airflow

Today’s, there are many tools to process data. Some can handle very large data very easily, but can not respond quickly or some quickly respond, but it is not easy to process large amounts of data on it. Due to its advantages or disadvantages, we have to use many data tools during our data processing.
Airflow is an orchestra conductor to control all different data processing tools under one roof . It has pretty strong monitoring, controlling and troubleshooting instruments to touch any level of data flow. We can author workflows as Directed Acyclic Graphs (DAG) of tasks. We can schedule to execute tasks on an array of workers while following the specified dependencies. It has rich command line utilities make performing complex surgeries on DAGs a snap. The rich user interface makes it easy to visualize pipelines running in production, monitor progress, and troubleshoot issues when needed.
Introduction
Basically an airflow pipeline consist of two main part which are DAG that include all pipeline steps, relationships, dependencies and parallelization and operators that execute dedicated tasks. Additionally it has some functionalities to make staff easy for the developer.
- Connections to define any external db, ftp etc. connection’s authentication,
- Variables to store and retrieve arbitrary content or settings as a simple key value,
- XCom to share keys/values between independent tasks.
- Pools to limit the execution parallelism on arbitrary sets of tasks.
- Hooks to reach external platforms and databases.
- Executors to run tasks locally or on a worker
are most important functionalities.

DAG (Directed Acyclic Graphs)
An Airflow DAG is a collection of all the tasks you want to run, organized in a way that show their relationships and dependencies. It is defined in python files that are placed in DAG_FOLDER which is defined in the Airflow configuration file (airflow.cfg) that is in airflow_home directory.
Each DAG and its tasks have an ID to reach their objects. There is no limit to create DAG file. We can create as many as we want with their arbitrary number of tasks.
Having multiple workflow in a DAG is not restricted but logically it is not recommended. It can cause hard management, troubleshooting etc. For instance if a DAG flow is making tea do not control garage door in that DAG.
A DAG can be called as a SubDAG which is defined under a python function. SubDAGs are executed with SubDAGOperator on its parent DAG. They are not callable without parent DAG.
Parent DAGs have to be defined globally and only parent DAGs objects and their SubDAGs will be callable. If we define a SubDAG but we didn’t call it in a parent DAG which is defined globally there is no way to reach that SubDAG because airflow executes only globally defined DAGs and its SubDAGs.
Operators
An operator in airflow is a dedicated task. They generally implement a single assignment and do not need to share resources with any other operators. We have to call them in correct certain order on the DAG. They generally run independently. On a DAG operators can even run on different machines.
In general if two operators need to share information we should combine them into a single operator. If it absolutely can’t be avoided, Airflow does have a feature for operator cross-communication called XCom that is described elsewhere in this document.
Airflow provides operators for many common tasks, including:
BashOperator- executes a bash commandPythonOperator- calls an arbitrary Python functionEmailOperator- sends an emailHTTPOperator- sends an HTTP requestSqlOperator- executes a SQL commandSensor- they are a specific type of operator that will continue to work until a certain criterion is met.
On the other hand if we are processing some tasks multiple times but there is no any operator to do that we can create our operator to process those tasks. See article “How to Write an Airflow Operator” for detail.
Data Pipeline
In this example we are going to build a data pipeline for the big data processing technique that we released on “A Big Data Processing Technique” article.
Data Flow Decision
We have 3 steps to process our data.
- Step1: Moving delimited text data into hive
- Step2: Cleaning hive table with UDF functions
- Step3: Moving clean data to MySQL
As you seen each step in the flow need an output from the previous step.
In addition to these 3 steps we want to add;
- Addition-Step1: we don’t want to start step-1 if delimited text data is not exist on hadoop cluster.
- Addition Step2: we want to create an ID in a different task and use it in Step2
- Addition Step3: we dont want to trigger step-2 if there is no more than 100 columns in the hive table that are created in the step-1.
- Addition Step4: finally we want to notify developers with an email.
to make robust workflow.
Building DAG
As you guess, first step in the DAG will be Addition-Step1 since there is no meaning to run other steps without data, Addition-Step2 and Addition-Step3 have to be done before Step2. Addition-Step3 will make a decision with BranchPythonOperator and if number rows less than 100 it will move to stop_flow task which is going to stop the process at that point. If the Addition-Step3 contains more than 100 lines, the execution criterion for Step2 occurs. After Step2 is completed, Step3 and Addition-Step4 are completed respectively.

As you seen from the process order Step1 and Addition-Step2 are running parallelly. We should write tasks as parallel as possible without breaking dependencies. We have detailed discussion about parallel runs later on this article.
Please see the DAG for the complete data flow. We can track complete flow dependencies and parallelization easily on that.

The code behind the DAG
In the DAG;
default_args: defines default arguments what we need to manage DAG. start_date is one of the important argument that we need to understand. It is starting date of data flow. If we put an old date and our scheduler scheduled to run the flow everyday It will catchup and run all the missing days from the old day to today. The catchup parameter can be selected as false with Airflow 1.8. See https://airflow.apache.org/scheduler.html
schedule_interval: It is run frequency. It can be defined in python datetime format or cron jobs format. Exp: schedule_interval=’12,13 * * *’
HdfsSensor: It is an operator to check file on HDFS. If the file is not exist on the defined path it will check with defined interval and will not pass to next step until file exist. This is the general algorithm for all Sensor Operators on airflow such as S3Sensor etc.
HiveOperator: It is executing Hive Queries. To use this operator hive_cli_default connection must be defined on the airflow connections.
PythonOperator: executs a python function. To send a parameters to that function we use templates_dict which is defined in the operator template fields. To call the function we use python_callable. Functions have to be defined with default arguments of DAG. We can return any value to use in different tasks. To catch those values we use cross communication called XCom
BranchPythonOperator: executes a python function that returns immediately downstream tasks. Number of task can be 1 to N. We use this operator to make decision to pass another task.
set_upstream, set_downstream: These are how we make dependencies between tasks. As you guess set_upstream makes upper tasks dependencies and set_downstream makes down tasks dependencies on the data flow. We can make multiple dependencies with a list of task_ids. For instance: we do not want to start task C if task A and B did not finish yet. C.set_upstream([A,B]). Another example can be list of task_ids from BranchPythonOperator function. We have to put list of task_ids that BranchPythonOperator function returns in set_downstream.
EmailOperator: It is a simple operator to send an email when ever we call it.
{{ ds }}: Airflow use jinja template to get its pre defined variables. ds is the running date. When we run old date with airflow test or backfill ds will be the old date what we use for test or backfill.
xcom_pull: to catch the value from diffrent tasks we use xcom_pull. For instance task_instance.xcom_pull(task_ids=’create_source_id’) will catch the return value from create_source_id task’s function. If we want to use xcom to send value a SubDag we need to define DAG ID too {{task_instance.xcom_pull(dag_id=’your_dag_id’, task_ids=’your_task_id’) }}.
To push key value to xcom we use xcom_push. To do that; kwargs[‘ti’].xcom_push(key=’your_key’, value=your_value). kwargs[‘ti’] is how we reach task_instance in the tasks.
The Functions behind PythonOperators
In the tasks;
create_source_id function: return source_id that we are going to use xcom to catch that value on clean_data task
count_data_rows function: it is function for the count_data_rows (BranchPythonOperotar) task. It is counting the mydata table and returns task_id based on number of rows. If the number of rows is less than 100 it is going return stop_flow (DummyOperator) that has not have any downstream task. If the row numbers grater than 100 it will return clean_data which is part of our normal flow. To get number of rows we use HiveServer2Hook which executes hive query on the schema what we send as a parameter and returns a dict that includes header of table and rows. There is no schema parameter for HiveServer2Hook airflow before 1.8 version. If you decide to use airflow earlier version you must define the schema on the WebUI for hive_cli_default connection.
move_data_mssql function: it is last step of data processing. It is creating text format of clean_mydata table via HiveCliHook that executes hive queries for selected schema and moving the data MsSQL mydata destination table via apache sqoop.
Test
To test an task we use airflow test
$ airflow test dag_id task_id ds
$ airflow test my-bigdata-dag create_hive_db 2017-11-22To test a task on subdag;
$ airflow test dag_id.subdag_id task_id dsRun
Running an airflow task is same as test;
$ airflow run dag_id task_id ds
$ airflow run my-bigdata-dag create_hive_db 2017-11-22# to run a task on subdag
$ airflow run dag_id.subdag_id task_id ds
when we use airflow run we can track tasks on WebUI but for test this is not possible.
Backfill
Airflow backfill run the dag between 2 dates.
-s: start date
-e: end date
$ airflow backfill -s 2017-11-21 -e 2017-11-22 dag_idScheduler
To start scheduler use airflow scheduler
$ airflow scheduler
[2017-11-26 09:36:23,220] {__init__.py:36} INFO - Using executor CeleryExecutor
____________ _____________
____ |__( )_________ __/__ /________ __
____ /| |_ /__ ___/_ /_ __ /_ __ \_ | /| / /
___ ___ | / _ / _ __/ _ / / /_/ /_ |/ |/ /
_/_/ |_/_/ /_/ /_/ /_/ \____/____/|__/
[2017-11-26 09:36:23,578] {jobs.py:680} INFO - Starting the schedulerwe should see above screen if everything ok.
When we start scheduler airflow immediately will start all the DAGs that have starting date earlier than now.
Parallel Runs
Airflow runs all the tasks parallelly if there is no any dependencies between tasks. If a task requires another task to finish, it will affects parallel processing. See the below DAG;

Once DAG start to run after start task it will run all Section-1 tasks parallelly. When all tasks in Section-1 are completed, some-other-task will start, and when some-other-task ends, all Section-2 tasks will run in parallel. And finally, after Section-2 tasks are completed, they will trigger end task.
As you seen from the DAG because of dependencies airflow is not run all the tasks parallelly. Parallel tasks have to be independent than other tasks.
Number of maximum parallel task is defined in airflow.cfg file with parallelism variable.
Airflow can work on multiple machine via CeleryExecutor. If we run airflow on more than one machine, parallel tasks can run on any machine during the processing time.
Airflow has also SequentialExecutor and LocalExecutor;
SequentialExecutor runs only one task instance at a time. It can not process parallel tasks
LocalExecutor runs on local machine. It is able to process parallel tasks.
Executor is defined in airflow.cfg file with executor variable
Tracking Tasks and Troubleshooting on WebUI
When we start scheduler or start backfill a DAG or run a single task we can track tasks on WebUI. All tasks have detailed information with a run log that is reachable from WebUI. See below image for detail.

Tasks will be highlighted in different color to see their status;

Whenever a task failed we can stop scheduler, fix the problem and restart them. Airflow will find and restart all remaining tasks.
Since some tasks logically have dependencies to different tasks that can be done before failing we have to be careful when we restart scheduler or backfill. For instance if a task failed because of a missing information which is coming from one of the previous task then we have to re run that task and all its dependencies.
Configuration
Airflow has a configuration file under the airflow_home directory. It can be seen on http://your_host:8080/admin/configurationview too. You should carefully set all your variables and databases.
Installation
See https://airflow.apache.org/installation.html
Project Code
Complete project code can be found on reference [2]
In addition to dag.py and tasks.py you will see;
hql.py file for all hive queries that we used in the project,
my_udf_function.py for udf function,
create_data.py which is a python random data generator for your test
create_table.sql is the destination mssql table model.
References
[1] Airflow http://pythonhosted.org/airflow/
[2] https://github.com/vergili/bigdata_tutorial