Airflow for Beginners
Introduced by Airbnb, Airflow is a platform to schedule and monitor data pipelines. Especially when it comes to having complex pipelines consisting of many steps of task executions and relation among them. At WorkGenius we are using Airflow for a variety of purposes, such as extracting freelancers performance and skills as well as pre-processing of data for the recommendation model.
In this article, first we overview the basic setup to run Airflow including an example use-case. Then we address how it can be optimized by different configuration parameters and the internal tools it provides.
Key concepts of Airflow such as DAG are skipped as they are well explained in the main documentation. If you are already familiar with the installation process, you can skip and go to the “Airflow Tips” section.
Installation
For the demo we use Airflow 1.10.3 via pip on Ubuntu.
Before install
When installing Airflow, it requires a directory to store the config and other files and directories (logs, database, etc). This directory is defined by the environment variable AIRFLOW_HOME, and by default it is set to ~/airflow. In case you want to change it to another directory, export this variable to your desired directory:
export AIRFLOW_HOME=/home/user/Documents/airflow/home
Installing Airflow
pip install apache-airflow==1.10.3
After installing Airflow, the first task is to initialize the database. This is done by the following command
airflow initdb
When the initialization is finished, we can check whether the installation is successful by running the command
airflow version
This should produce an output similar to this:
Running Server/Scheduler
Airflow provides two main processes; one is the server, which provides a web UI to monitor and manage the DAGs. The other one is the scheduler that is responsible to manage, schedule and run the DAGs. They can be run separately by:
airflow webserver
airflow scheduler
After install
By default, all the configuration of Airflow is defined in the file airflow.cfg under AIRFLOW_HOME directory. For example, initially, Airflow loads some sample DAGs. This is set via load_examples variable. In order to remove them, one way is to set the value to False. These type of changes that contains a modification in the database, you need to reset the database first:
airflow resetdb
Warning: resetting database will delete all the data regarding you DAGs.
To make sure your changes are applied properly, it’s best to stop both scheduler and server before changing the config and run them again after changing values.
Define the first DAG
Imagine you want to provide a report for your online store. The report should be generated at the beginning of the month and contains the sum of the prices of the purchases for the previous month. Now, one year is passed an you want the report to be automated.
In the code snippet above, line 13 defines the DAG itself. The first parameter is the DAG’s title. It runs every 1st day of the month at 4:00 a.m in the morning. The DAG start is set to 2018.01.01. The catchup defines if the DAG should run for every interval from it’s start time until the current time or not. In this case, we want it to do so; that’s why it’s set to True.
There are two tasks defined for this DAG, first one is a dummy task define in line 18. The dummy operator takes no callable, but it can be used to connect different operators and be a checkpoint to manage your connections.
The main task is defined in line 19, a python operator that takes a python callable. The key point here is to provide the context of the DAG to the python method. A context is a dictionary containing the information about the DAG, like execution time, which defines what is the date and time that the DAG is running for. The python callable of this example is the get_sum_of_prices method and takes the context as input.
Finally, the order of the tasks is defined in line 22, where >> shows which task should be finished (dummy operator) before the next task (etl_operator) starts.
When you run the server and scheduler, you’ll see the new DAG in the list. By turning it on, it will start running DAGs from the start date until the current time. The image above shows one successful run for 2019.05.01.
If you open the get_prices task and view the logs, you should see a line containing the information below (dates might change according to your selected DAG run)
... INFO - GETTING DATA FROM 2019-05-01 TO 2019-06-01
Airflow Tips
Depending on the use cases, some challenges may arise while designing the pipeline. Below there is an overview of some of those challenges and possible solutions for them.
Reduce CPU Load
If you are using cloud services to host your Airflow, especially the “pay as you go” mode as in Amazon Web Services, one important config parameter is min_file_process_interval. This value controls how often the scheduler should check the DAG definitions for changes, and by default is set to zero (in seconds). This means that it repeatedly checks for changes, which can lead to a high load of CPU on your host machine where scheduler is running. Changing this to a higher value, like 60 (seconds), will solve this issue.
Run DAG (auto/manual/time of last run…)
If you are new to Airflow, you might get confused with the order of your DAG runs, mainly when you run the same DAG in both manual and auto mode. There are two different behaviors for the DAG-run timestamp parameter. In the auto mode, the DAG run timestamp is set to the start timestamp of the DAG. For example, if you set your DAG to run daily at 15:00, when it runs on 2019.05.15 at 15:00, the run timestamp is set to 2019.05.14 15:00.
The other mode is the manual run, in which the run timestamp is set to the exact time when it is run.
You may notice the effect in the “Last Run” timestamp of your DAG. For the example above, if you run your DAG on 2019.05.15 at 10:00 manually, and it runs in the auto mode as every day, then you’ll see your manual run as the “Last Run”, since it has it’s real timestamp, while the auto mode timestamp is set to one day ago.
External Trigger vs. Sequential Executor
Airflow contains several approaches to execute tasks in DAGs, such as the sequential executor or the local executor. As its name suggests, sequential executor runs tasks in sequential order, one task at each time, without any parallel execution. This type of executor should be handled carefully in case you want to use an external listener for your DAG, meaning a task in your DAG is waiting for another task on another DAG to be finished. In this case, if you run the first DAG before the second DAG, it will end up in a state that it waits for the other task, and since only one task can be running at the same time, the other task won’t have a chance to run; and your DAG gets stuck in the waiting state.
Get Last Run
What if your DAG runs are dependent to the previous DAG run timestamp. In this case, you need to access the last (successful) run of a DAG and get it’s properties, like execution time. In general, Airflow provides two ways of accessing it’s information; one is the context parameter that can be passed to an operator. The context represents a dictionary of the current task including previous execution timestamp. The other one is the provide_session annotator that passes a SQL-alchemy session into your method.
Max Active Runs
As the documentation says, this parameter defines how many instances of the same DAG can run concurrently. When your ETL is running in a continuous mode, meaning that only the newer data is processed through ETL in each run, then you might want to have a check in order to avoid having duplicated data, since the DAG runs are independent of each other. One solution is to set this parameter to 1, so there are not multiple instances running at the same time.
Run DAG in Local time
In case your Airflow host is not in the same timezone that your applications are, or you have your application in different regions, it might be necessary to have your processes run when the load on your servers and data sources are low. Airflow supports this by providing a parameter for start_date of your DAG that defines in which timezone it should be running, named tzinfo. The code snippet below shows a DAG that is set to run every morning at 3:00 a.m in Central European Time.
In this article we had a brief overview of Apache Airflow and provided a simple real-life example. We showed how Airflow can make it even easier to setup and run the process. Also, we introduced some ideas to some probable challenges in different dimensions while using it, such as config parameters and the tools Airflow provides to the developers.