Apache Airflow in 10 minutes

A quick introduction to Apache Airflow (A beginners guide)

Ashish Kumar
The Startup
10 min readJul 4, 2020

--

Introduction

Apache Airflow is an open-source tool for orchestrating complex workflows and data processing pipelines. It is a platform to programmatically schedule, and monitor workflows for scheduled jobs.

Apache airflow makes your workflow simple, well organized, and more systematic which can be easily authored and schedules based on the requirement.

Let’s start with the basics.

What do we mean by workflow?
Workflow can be your simple calculation, creating infrastructure, perform some query in the database, bash command, python script, MySQL queries, Hive queries, etc.
Workflow is divided into one or more than one task which relates to each other and forms a DAG (Directed Acyclic Graph).

What is DAG?
In simple terms, DAG is a collection of all small task which joins together to perform a big task.

Still, Confused? Let us take a better example.

Let’s understand this by Phases of Compiler.

For people who are unaware of phases of the compiler — Think of it as a process followed by your compiler to convert high-level language into the low-level language (which your machine understands)

This is what happens after compiling a piece of code. Our code gets converted into a character stream, Lexical Analyzer converts it into tokens, then syntax analyzer converts it into a syntax tree, then semantic analyzer, intermediate code generator, code optimization, target assembly code. Here each step is very crucial and very much dependent on the previous steps.
Error in any one of the steps will not compile our code successfully.

How these steps would look in DAG?

Here diagram of some DAG

If your task is dependent on some other task, you can set dependencies based on your requirement. We will discuss this briefly in getting started with Apache-airflow.

What are the scheduled jobs?

Suppose your workflow must be run every Sunday, you can schedule it in such a way that it will only be trigger on Sundays. How cool is this?
In simple terms, you can automate your workflow. The best part of automation is you can avoid future human errors.
Your DAG can be easily monitored, controlled, and triggered.

What if your workflow gets fail? What if it got completed successfully? What if it took more than the expected time to complete?

Well, keeping all such things in mind, apache-airflow has given such features like If your workflow gets fail, you can set it as to send an Email alert, slack notification to the required person/team.
You can also set it to send an email when the DAG ran successfully.

What if your task got successfully run, but took more than expected time? (In real case scenario this is a problem) — For this, you can set your SLA. Suppose your SLA time is 600 seconds. But your task took 900 seconds. Isn’t it something wrong? If this happens, it triggers the required team

Advantages of Apache Airflow

Apache-airflow has got quite a few advantages which makes it a better tool than comparing to other tools in the market.
First, we’ll discuss its advantages and then a few benefits of using airflow over other similar tools.

  • Open Source / Python — Airflow is developed in Python and it’s very easy to design your workflow. You can contribute your plugins in open source and also use other plugins based on your requirement.
  • Monitoring — You can easily monitor your task status once it is running. Airflow provides all the necessary details of each task such as execution time, landing time, logs, etc.
  • Scalable — Mostly all the data-driven companies prefer to use Airflow, so the complexity of workflow will grow as moving ahead. But airflow holds the advantage over other tools as it’s more scalable.
  • Smart Scheduling — You can schedule your task however you want to. For example, if you want to schedule your task to run every Sunday at 4:00 PM, you can do it.
  • Managing Dependency — One of the cool features which make airflow better than all other tool is proper management of dependency. If a task t2 is dependent on task t1 which is further dependent on task t0, you can set dependency across.
  • Resilience — All workflow tools sometimes behave unexpectedly, may it be any reason such as network issues, human error, taking a longer time than expected, etc. Airflow has several features such as retry. If by any chance, your task failed to execute, airflow retires it after a certain minute, (If it might have failed due to other reasons such as network issue, then retry might make that task successful).
    It even gives an alert message to the team via email/slack.
  • Alerting — This is one of the coolest feature airflows have, if you want your team should get notified if any of your tasks get fail, it will notify via Mail, slack notification.
    Even if your task gets successful, you may want to get notified, airflow has that feature too.
  • Service Level Agreement (SLA) Timeout: This might be one of the most crucial features (Al least for some company) which airflow has provided. If your task takes generally 500 sec to finish, but one certain day, it took 1500 seconds, don’t you think something unexpected happened?
    Wouldn’t you like to get notified? You may have your own reason to store these records, maybe for analytic purposes or any other research work.
    Example: Suppose your company collects user registration data at the end of the day, but you have noticed that every Saturday your task is taking a lot of time that means people on Saturday are more to participate.
  • User Interface — It has a good UI which makes it more approachable for the user. You can view the task in a well-formatted tree structure, you can view the logs details, airflow database, task duration, lading times, rich graph view in UI.
  • Plugins and hooks — Airflow has got various pre-defined plugins and also user-defined plugins which makes your task easy.
  • Custom plugins, hook, sensors — I will get back to you about these keywords such as plugins hook sensors, etc. In short, we can make our own operators in airflow and manipulate it to work as we like it to behave.
  • Cloud services — Cloud platforms like Google cloud shows its support to apache- airflow. Cloud composer — is a google service that creates an environment of apache — airflow in google cloud. You can perform almost everything and use cloud services such as Google BigQuery, Dataproc, Dataflow, etc.

How about doing some hands-on?

I will be using a cloud composer (a GCP based managed services) to create an airflow environment. We can also create a local environment.
As soon as you create a cloud composer, it creates a bucket in your cloud storage automatically which is eventually mounted with your composer environment. Similarly, you will have the same directory structure when you will install on your local environment.

In the DAG folder, you need to upload all your python script or DAG which will get rendered into the airflow server and show in the UI, and then you can trigger it manually or if scheduled, it will trigger automatically.

Below is the sample code!

Let us understand this code line by line.

These are the import statements for the facility which we are using in our DAG. Since we are using BashOperator we need to import BashOperator from the airflow library.

These are the default argument which we can set for each task by setting the argument to each task’s constructor. We can define a dictionary of default parameters that we can use when creating tasks.

Start_date as yesterday means start as soon as it loaded into the server. We can set it at any time.

Email_on_failure as False, if it is true it will send an email to the specified person/team if any particular task gets fail.

Retries as 1 mean the number of retries after the task get fails.

Retry_delay is set as 5 minutes means, after any specific task gets fail, it should wait exactly 5 minutes to start a retry.

Email_on_retry as False, if it is true, then after a task gets fail, after every retry it will send email to the specified person/team.

In DAG everything works as an operator.
Example t1=SomeOperator(arguments)

We are using Bash Operator in this example.

t1 is a value that is calling the BashOperator class and sends all the required arguments to it.
Every task has a task_id which uniquely defines a task, and other required arguments based on what operator you are using.

In our DAG, we are running two different tasks such as t1 and t2, one is creating a directory and the other is deleting the directory.
So, its obvious t1 has to be run before t2, so we have set dependency such as t1 must run before task t2.

There are two ways to set dependency:

  • t1 >> t2 (This means t1 will run before t2)
  • t1.upstream(t2) (This also means the same as above)

In order to do vice-versa the syntax would be like this:

  • t1 << t2
  • t1.downstream (t2)

Finally, we need to place our .py file into the DAG folder, and then it will get loaded into the server automatically.

You can see the task graph view as:

The arrow denotes that make_directory is dependent on delete_directory.

Well, this was a very simple example of how we create tasks and run the workflow.

Operators

While DAGs describes how to run a workflow, Operators determine what actually gets done.

An operator describes a single task in a workflow. Operators are usually (but not always) atomic, meaning they can stand on their own and don’t need to share resources with any other operators.

Note: If two operators need to share information, like a filename or a small amount of data, you should consider combining them into a single operator. If it absolutely can’t be avoided, Airflow does have a feature for operator cross-communication called XCom.

Airflow provides many operators some of them includes:

  • BashOperator (Which we just saw)
  • PythonOperator — Use to call any python function in DAG
  • EmailOperator — Sends email
  • SimpleHttpOperator — Sends an HTTP request
  • MySqlOperator, SqliteOperator, PostgresOperator, OracleOperator, JdbcOperator — executes SQL queries.

Sensors

Sensors are a special type of operator which runs behind the scene all the time. The sensor class is created by extending BaseSensorOperator.

It has a poke method, which executes the task over and over after every poke_interval seconds until it returns True and if it returns False it will be called again.

Example: Sensor to check whether a file is present in a specified directory. After every poke_interval, the poke method of the sensor class will be executed, if the file is not present it will send False, once the file is present in the directory, it will return True.

Trigger Rules

All operators have a trigger_rule argument that defines the rule by which the generated task gets triggered. The default value for trigger_rule is all_success and can be defined as “trigger this task when all directly upstream tasks have succeeded”. All other rules described here are based on direct parent tasks and are values that can be passed to any operator while creating tasks:

  • all_success: (default) all parents have succeeded
  • all_failed: all parents are in a failed or upstream_failed state
  • all_done: all parents are done with their execution
  • one_failed: fires as soon as at least one parent has failed, it does not wait for all parents to be done
  • one_success: fires as soon as at least one parent succeeds, it does not wait for all parents to be done
  • none_failed: all parents have not failed (failed or upstream_failed) i.e. all parents have succeeded or been skipped
  • dummy: dependencies are just for show, trigger at will
    See more about Trigger rules

Are you curious to know how two operators communicate with each other?

Apache-airflow provides a feature called XCom

XCom (Cross — Communication):

Communication among two operators. If any operator returns some value, it gets store in xcom, airflow provides a mechanism to pull xcom value using xcom_pull() and use it in some other operation and also to push value using xcom-push().

Example: View code in GitHub Xcom_example.py

Let’s end this article by listing other alternative tools in the market.

  • Cron
  • Apache Oozie
  • Luigi
  • Azkaban

Now that you have understood what DAG is, here’s a suggestion for you

Picture Credit Kaxil Naik

Airflow Documentation
Apache-Airflow GitHub
To see some example code visit my GitHub

--

--

Ashish Kumar
The Startup

GCP | Azure | DevOps | IaC |Kubernetes | Docker | DataOps | Apache Airflow | IaC | Developer | Data Engineer Enthusiast