Analytics Vidhya
Published in

Analytics Vidhya

Apache Airflow — Part 1

Every programmer loves automating their stuff. Learning and using any automation tool is fun for us. A few months ago, I came across a wonderful open source project called apache-airflow. I have tried to discover this open source project and use it in my existing codebase. This blog series is just a summary of my recent learning about this tool. In the first part of the series, we will be discussing topics like workflow, airflow installation, DAGs, and implementing DAGs in Airflow. So Let’s start from the very beginning.

What is workflow?

Before starting any discussion about apache airflow, let’s try to understand what is workflow by a simple illustration.

We often encounter this type of use case where we need to run a set of steps to create expected outputs. This set of steps to accomplish a given data
engineering task is called a workflow. There are many workflow tools that we can use for scheduling these types of tasks. Using Cron Job is one of the solutions but managing several Cron Jobs is a tedious task. The world is opting for several other workflow tools like Apache-Airflow, Luigi, SSIS etc. We will be focusing on Apache-Airflow.

What is Airflow?
Airflow is a platform to program workflows, including:

  • Creation
  • Scheduling
  • Monitoring

We can implement programs from any language, but workflows are written in Python in apache-airflow. It implements workflows as DAG i.e. Direct Acyclic Graph which can be accessed via command line or the web interface.

What is a DAG?
DAG stands for Direct Acyclic Graph which we can easily understand by the below illustration.

  • Directed, there is an inherent flow representing dependencies between components.
  • Acyclic, does not loop / cycle / repeat.

In Airflow, this represents the sets of tasks that make our workflows. We can define a simple DAG using the following code structure.

from airflow.models import DAG
dag = DAG(
dag_id='first_dag',
default_args={"start_date": datetime(2020, 7, 25)}
)

Within Airflow DAGs are written in Python but it can use components (typically tasks) written in any other language. We can use airflow list_dags command to list down all DAGs present in Airflow.

Now, we have understood the basics of Airflow. Let’s try to install it into our virtual environment for further understanding and use.

A quick Installation guide.

We will be using a python virtual environment to install apache-airflow in our local system.

  • Create a directory named airflow_sandbox and inside that create a directory named backend
  • Open your terminal inside your backenddirectory.
  • In your backend directory, create a python virtual environment and activate it for further use.
  • Run `export AIRFLOW_HOME=`pwd`/airflow_home`
  • Run pip install apache-airflow
  • Initialize the database by airflow initdb
  • Start the web server, the default port is 8080 airflow webserver -p 8080
  • start the scheduler by airflow scheduler

If you get stuck then please refer to the official documentation of apache-airflow. Once you are done with the installation create a directory named dags inside airflow_home folder. The DAGs are always created inside this dags folder.

Implementing Airflow DAGs

Airflow Operator

While DAGs define how to run a workflow, Operators determine what actually gets done by a task. An Operator represents a single, idempotent task and they are usually (but not always) atomic. They can stand on their own and don’t need to share resources with any other operators. The DAG will make sure that operators run in the correct order; other than those dependencies, operators generally run independently. In fact, they may run on two completely different machines.
Summarizing the above discussion on Airflow Operator:

  • represents a single task in a workflow.
  • run independently (usually)
  • generally don’t share information

Airflow provides operators for many common tasks and some of the most used operators are as follows.

  • PythonOperator: Calls a python function
  • BashOperator: Execute a bash command or call a bash script
  • SimpleHttpOperator: Sends a HTTP request
  • EmailOperator: Sends an Email

Airflow provides many other operators and we can check from its documentation.

Let’s try to discover BashOperator and PythonOpertor by a simple example.

A simple BashOperator to generate a random number.

from airflow.operators.bash_operator import BashOperatortask_1 = BashOperator(
task_id='generate_random_number_by_bash',
bash_command='echo $RANDOM',
dag=dag
)

A simple PythonOperator to generate a random number.

import random
from airflow.operators.python_operator import PythonOperator
def generate_random_number():
return random.randint(0, 100)
task_2 = PythonOperator(
task_id='generate_random_number_by_python',
python_callable=generate_random_number,
dag=dag
)

Tasks

Tasks are:

  • Instances of operator
  • Assigned to python variable(usually)
  • Referred to by task_id in Airflow.

If we look at the above example then python variable task_1 and task_2 are two examples of the tasks.

Task dependencies

  • Define a given order of task completion.
  • In Airflow version 1.8 or above, it is defined by bitshift operators.
    * >> or upstream operator (It simply means before)
    * << or downstream operator (It simply means after)

If we define task_1 >> task_2 in airflow then task_1 will be executed before task_2.

If we define task_1 >> task_3 << task_2 then it means that task_1 and task_2 will be executed before task_3.

Airflow Scheduling

DAGs run

  • A specific instance of a workflow at a point in time.
  • It can be run manually or by schedule_interval.
  • There can be multiple states like running, failed, success etc.

When scheduling a DAG, these are the attributes that come handy.

  • start_date : The date/time to initially schedule a DAG run.
  • end_date : When to stop the running DAG instances (Optional Argument).
  • max_tries : Number of maximum attempt to make before stopping (Optional Argument )
  • schedule_interval: How often to schedule the DAG? You can define via any cron syntax like */5 * * * * i.e. for every 5 minutes. You can check this website to generate any correct cron syntax.

schedule_interval gotcha :
When scheduling a DAG, Airflow will always, schedule a task at
start_date + schedule_interval.

For example

task_2 = PythonOperator(
task_id='generate_random_number_by_python',
python_callable=generate_random_number,
start_date=datetime(2020, 7, 28),
schedule_interval='0 0 * * *',
dag=dag
)

This means that the earliest starting time to start a DAG is on 29 July midnight 00:00

Kudos!!! We have installed Apache-Airflow and discussed some of the introductory concepts of Apache-Airflow. Let’s meet in the next part of this series and try to discover more about this amazing open-source. Till then Goodbye and Keep Learning.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Ankur Ranjan

Ankur Ranjan

Big Data. Adventure. Sapiosexual. Coding