Apache Airflow — Part 1
Every programmer loves automating their stuff. Learning and using any automation tool is fun for us. A few months ago, I came across a wonderful open source project called apache-airflow. I have tried to discover this open source project and use it in my existing codebase. This blog series is just a summary of my recent learning about this tool. In the first part of the series, we will be discussing topics like workflow, airflow installation, DAGs, and implementing DAGs in Airflow. So Let’s start from the very beginning.
What is workflow?
Before starting any discussion about apache airflow, let’s try to understand what is workflow by a simple illustration.
We often encounter this type of use case where we need to run a set of steps to create expected outputs. This set of steps to accomplish a given data
engineering task is called a workflow. There are many workflow tools that we can use for scheduling these types of tasks. Using Cron Job
is one of the solutions but managing several Cron Jobs
is a tedious task. The world is opting for several other workflow tools like Apache-Airflow, Luigi, SSIS etc. We will be focusing on Apache-Airflow.
What is Airflow?
Airflow is a platform to program workflows, including:
- Creation
- Scheduling
- Monitoring
We can implement programs from any language, but workflows are written in Python in apache-airflow. It implements workflows as DAG i.e. Direct Acyclic Graph which can be accessed via command line or the web interface.
What is a DAG?
DAG stands for Direct Acyclic Graph which we can easily understand by the below illustration.
- Directed, there is an inherent flow representing dependencies between components.
- Acyclic, does not loop / cycle / repeat.
In Airflow, this represents the sets of tasks that make our workflows. We can define a simple DAG using the following code structure.
from airflow.models import DAG
dag = DAG(
dag_id='first_dag',
default_args={"start_date": datetime(2020, 7, 25)}
)
Within Airflow DAGs are written in Python but it can use components (typically tasks) written in any other language. We can use airflow list_dags
command to list down all DAGs present in Airflow.
Now, we have understood the basics of Airflow. Let’s try to install it into our virtual environment for further understanding and use.
A quick Installation guide.
We will be using a python virtual environment to install apache-airflow
in our local system.
- Create a directory named
airflow_sandbox
and inside that create a directory namedbackend
- Open your terminal inside your
backend
directory. - In your
backend
directory, create a python virtual environment and activate it for further use. - Run `export AIRFLOW_HOME=`pwd`/airflow_home`
- Run
pip install apache-airflow
- Initialize the database by
airflow initdb
- Start the web server, the default port is 8080
airflow webserver -p 8080
- start the scheduler by
airflow scheduler
If you get stuck then please refer to the official documentation of apache-airflow. Once you are done with the installation create a directory named dags
inside airflow_home
folder. The DAGs are always created inside this dags
folder.
Implementing Airflow DAGs
Airflow Operator
While DAGs define how to run a workflow, Operators determine what actually gets done by a task. An Operator represents a single, idempotent task and they are usually (but not always) atomic. They can stand on their own and don’t need to share resources with any other operators. The DAG will make sure that operators run in the correct order; other than those dependencies, operators generally run independently. In fact, they may run on two completely different machines.
Summarizing the above discussion on Airflow Operator:
- represents a single task in a workflow.
- run independently (usually)
- generally don’t share information
Airflow provides operators for many common tasks and some of the most used operators are as follows.
- PythonOperator: Calls a python function
- BashOperator: Execute a bash command or call a bash script
- SimpleHttpOperator: Sends a HTTP request
- EmailOperator: Sends an Email
Airflow provides many other operators and we can check from its documentation.
Let’s try to discover BashOperator and PythonOpertor by a simple example.
A simple BashOperator to generate a random number.
from airflow.operators.bash_operator import BashOperatortask_1 = BashOperator(
task_id='generate_random_number_by_bash',
bash_command='echo $RANDOM',
dag=dag
)
A simple PythonOperator to generate a random number.
import random
from airflow.operators.python_operator import PythonOperator
def generate_random_number():
return random.randint(0, 100)task_2 = PythonOperator(
task_id='generate_random_number_by_python',
python_callable=generate_random_number,
dag=dag
)
Tasks
Tasks are:
- Instances of operator
- Assigned to python variable(usually)
- Referred to by
task_id
in Airflow.
If we look at the above example then python variable task_1
and task_2
are two examples of the tasks.
Task dependencies
- Define a given order of task completion.
- In Airflow version 1.8 or above, it is defined by bitshift operators.
* >> or upstream operator (It simply means before)
* << or downstream operator (It simply means after)
If we define task_1 >> task_2
in airflow then task_1
will be executed before task_2
.
If we define task_1 >> task_3 << task_2
then it means that task_1
and task_2
will be executed before task_3
.
Airflow Scheduling
DAGs run
- A specific instance of a workflow at a point in time.
- It can be run manually or by
schedule_interval
. - There can be multiple states like
running
,failed
,success
etc.
When scheduling a DAG, these are the attributes that come handy.
start_date
: The date/time to initially schedule a DAG run.end_date
: When to stop the running DAG instances (Optional Argument).max_tries
: Number of maximum attempt to make before stopping (Optional Argument )schedule_interval
: How often to schedule the DAG? You can define via anycron
syntax like*/5 * * * *
i.e. for every 5 minutes. You can check this website to generate any correctcron
syntax.
schedule_interval gotcha :
When scheduling a DAG, Airflow will always, schedule a task atstart_date + schedule_interval
.
For example
task_2 = PythonOperator(
task_id='generate_random_number_by_python',
python_callable=generate_random_number,
start_date=datetime(2020, 7, 28),
schedule_interval='0 0 * * *',
dag=dag
)
This means that the earliest starting time to start a DAG is on 29 July midnight 00:00
Kudos!!! We have installed Apache-Airflow and discussed some of the introductory concepts of Apache-Airflow. Let’s meet in the next part of this series and try to discover more about this amazing open-source. Till then Goodbye and Keep Learning.