Analytics Vidhya
Published in

Analytics Vidhya

Getting started with Airflow

How-to guide on setting up Airflow on Linux machine and creating a basic workflow using BashOperator, PythonOperator and MySqlOperator

Photo by Alvaro Reyes on Unsplash

Airflow is a platform created by the community to programmatically author, schedule and monitor workflows.

Airflow is a powerful tool when it comes to deploying and monitoring workflows. It uses standard Python features to create workflows, including date time formats for scheduling and loops to dynamically generate tasks. This makes it easy to maintain full flexibility when building workflows.

It also comes with a really easy-to-use UI and provides many plug-and-play operators that are ready to execute tasks on SQL Servers, GCP, AWS, and many other third-party services.

Below article will give a walkthrough on installing and using Airflow on a Linux machine. I have used Ubuntu 20.04 via WSL on a Windows 10 laptop for this, although the steps will work on any Linux based machine. Airflow 2.1.3 and Python 3.8.10 have been used for this setup.

Checking Python installation

Ensure you have Python already installed on the machine. Run below command to see the version of Python.

python3 --version
Checking Python installation

You can refer to documentation here to install Python- https://docs.python-guide.org/starting/install3/linux/

Install pip

Check if you have pip installed by running below command:

pip --version

If it is not installed, install using:

sudo apt install python3-pip

Installing Airflow

We will be using Airflow version 2.1.3 for this setup. Support for version 1.10 ended in June 2021 so it is recommended to use version 2.

Note: This guide installs Airflow directly onto base environment of Python. In case you want to install it in a virtual environment, please create appropriate environment using venv and switch to it before running below commands.

Install Airflow

Airflow requires a home directory, default is ~/airflow but you can set it to any directory of your choice. You can also add below line to .bashrc file so it gets set every time you open a terminal.

export AIRFLOW_HOME=~/airflow

Set variables for Airflow and Python version. These are then used in the pip command to ensure all corresponding dependencies are correctly installed. In case you directly run pip install apache-airflow==2.1.3 it may cause issues with few package versions such as SQLAlchemy.

AIRFLOW_VERSION=2.1.3PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

For our case, the variable PYTHON_VERSION will get set to 3.8.

Now, run the installation command that will use above set variables to get Airflow 2.1.3 with our Python 3.8.

pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

Initialize database for Airflow

airflow db init

Create Users

Create user-password to login to Airflow WebUI using below command. You can add multiple users, with varying user privileges if you want multiple people to use it.

Admin

Run below command and set password when prompted.

airflow users create \
--username admin \
--firstname Sherlock \
--lastname Holmes \
--role Admin \
--email sherlock@221-b-bakerstreet.org

User

airflow users create \
--username jw \
--firstname James \
--lastname Watson \
--role User \
--email watson@221-b-bakerstreet.org

Note: Users with ‘User’ role will have limited access to some of the Airflow functionalities. Please proceed with ‘Admin’ role for below steps.

Start the server

Use below command in a terminal to start the web server. Update the port value as required.

airflow webserver --port 8080

This terminal will have to be kept on for the webserver to keep running.

Starting webserver

Navigate to localhost:8080 to access the WebUI. Login using the username-password we created earlier.

WebUI Login page

You’ll get below warning:

Scheduler warning

Start Scheduler

We have to also start a Airflow scheduler job in a separate terminal that will keep a track of DAG schedules and run them on time.

airflow scheduler
airflow scheduler

Updating configuration

Airflow configurations are saved in airflow.cfg file located in the Airflow home directory. Some of these configurations can be updated to make a few things easier.

Navigate to the airflow home directory:

cd $AIRFLOW_HOME
vim airflow.cfg

dag_dir_list_interval

# How often (in seconds) to scan the DAGs directory for new files. Default to 5 minutes.
dag_dir_list_interval = 300

This value in the configuration determines how often newly added DAG files are scanned and updated on the WebUI. Default value is set to 5 minutes which means that after adding a new DAG you may have to wait up to 5 minutes for it to show up on the WebUI. You can set this to a lower value if required.

min_file_process_interval

# Number of seconds after which a DAG file is parsed. The DAG file is parsed every
# ``min_file_process_interval`` number of seconds. Updates to DAGs are reflected after
# this interval. Keeping this number low will increase CPU usage.
min_file_process_interval = 30

This value in the configuration determines how often updates to existing DAG files are scanned and updated on the WebUI. Default value is set to 30 seconds which means that after updating a DAG code you may have to wait up to 30 seconds for the changes to show up on the WebUI. You can set this to a lower value if required.

Creating DAGs

Example DAGs

Airflow comes with various sample DAGs that you can see on the WebUI when you login. These can be used as reference for different kinds of tasks that can be created and orchestrated via Airflow.

Unpause the DAG by clicking on the radio button on the left of DAG name to enable it. Click on the DAG name to see various details across the tabs present.

Unpause a DAG
DAG details- Graph View

Note: sometimes the airflow scheduler throws some errors if the example DAGs are unpaused. Please pause them in case you face any issues.

DAG bag

We can create our own DAGs by creating python scripts that define the DAG and saving them in $AIRFLOW_HOME/dags directory. The location of this directory is set in the airflow.cfg configuration file under dags_folder and can be updated if required.

mkdir $AIRFLOW_HOME/dags
cd $AIRFLOW_HOME/dags

Operators

We will cover following 3 operators and create a DAG that uses them:

  1. BashOperator
  2. PythonOperator
  3. MySqlOperator

BashOperator

This operator executes commands on the Bash shell. Below is an example of the syntax.

bash1 = BashOperator(
task_id='bash1',
bash_command='echo "BashOperator on Airflow" > bash1_op.txt'
)

We can also execute shell scripts by calling a .sh file.

bash2 = BashOperator(
task_id='bash2',
bash_command='/home/vijayp/test.sh'
)

This can be used to execute python files as well.

bash_python = BashOperator(
task_id='bash_python',
bash_command='python3 /home/vijayp/test.py'
)

PythonOperator

This operator can be used to call python functions defined in the DAG file or in another python script.

def hello_world():
print('Hello World')
return 'This gets printed in the logs'

python1= PythonOperator(
task_id='python1',
python_callable=hello_world
)

In case you have the function defined in another script, import it in the DAG file and then pass it to PythonOperator.

from my_script import my_funcpython2= PythonOperator(
task_id='python2',
python_callable=my_func
)

MySqlOperator

To connect to a MySQL database via Airflow and use this operator, we need to install below packages. Restart Airflow post installation.

sudo apt install libmysqlclient-devpip install apache-airflow-providers-mysql

We also need to have a MySQL server running to be able to connect to it. I have used https://www.freesqldatabase.com/ to quickly create a free database and used the connection details in Airflow.

Adding SQL connection to Airflow

After installing above packages, we need to save connection details to the MySQL server in Airflow.

On the WebUI, go to Admin > Connections

Adding connections

Add a new connection with below settings. Fill in your database credentials.

mySQL connection

DAG File

We will now create a DAG that uses all the above three operators and schedule it to run every hour. We need to add below things to the code:

Required Imports

We will import all 3 operators we have seen above.

# importing the required libraries
from datetime import timedelta, datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator

Defining DAG properties

default_args = {
'owner': 'sherlock',
'depends_on_past': False,
'start_date': datetime(2021,9,1),
'email': ['sherlock@221-b-bakerstreet.org'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
# define the DAG
dag = DAG(
'airflow_get_started',
default_args=default_args,
description='Getting Started with Airflow',
schedule_interval=timedelta(hours=1), # run every hour
catchup=False # do not perform a backfill of missing runs
)

Python function for PythonOperator

def hello_world():
print('Hello World')
return 'This gets printed in the logs'

Defining steps

# define steps
python1= PythonOperator(
task_id='python1',
python_callable=hello_world,
dag=dag
)
bash1 = BashOperator(
task_id='bash1',
bash_command='echo "BashOperator on Airflow" > bash1_op.txt',
dag=dag
)
mysql1 = MySqlOperator(
task_id='mysql1',
mysql_conn_id='mysql_conn', # name of the connection we defined in Admin > Connections
sql="""INSERT INTO sql4438199.airflow_tb values (5);""", # sql command; can also be path to a sql script
dag=dag
)

Creating the graph flow of the steps

python1 >> bash1 >> mysql1

Adding DAG

Save all the above snippets to a .py file in $AIRFLOW_HOME/dags directory

Add DAG file

This will now show up on the WebUI is some time depending on the time set in airflow.cfg. Click on the radio button to Unpause the DAG.

Unpause the DAG

It should immediately start its first run and then it will run every hour.

First Run

You can explore the different views to get details about the DAG and its runs. Click on any step and click on Log to see the logs.

Outputs

BashOperator created the bash_op1.txt file.

PythonOperator output can be seen in the logs.

MySQLOperator inserted rows into the table airflow_tb.

The DAG code file is present here for reference:

Thank you for reading this article. Please reach out to me via comments in case you have any questions or any inputs.

You can find python/pyspark related reference material on my git repo here.

--

--

Analytics Vidhya is a community of Analytics and Data Science professionals. We are building the next-gen data science ecosystem https://www.analyticsvidhya.com

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store