Apache Airflow. Create ETL pipeline like a boss.

Narongsak Keawmanee
6 min readOct 23, 2019

--

From my last article. Introduction of Airflow. tool for create ETL pipeline. we introduce basic and feature of Apache Airflow. the scheduler task workflow. for this article we will drive a bit deeper. to share the example case how to use Airflow to create ETL pipeline in realword. so let get started.

Requirement

Assume we are ecommerce website. we have customer order every minute and we need to simulate customer ordering with random amount of gmv that customer buying.

What we will doing base on requirement.

We will init apache airflow and connect with RDS databases on AWS. then using mysql as database and we will use Airflow for create random simple sales order on database every minute.

Connect Apache Airflow with RDS database on AWS.

I will skip how to create RDS database real quick. if you want to know about that the official aws document will be good to looking at. after we created RDS database. we will connect it with our Airflow.

We will go to our airflow config and open it

# check path
pwd
# get result from pwd export airflow_home to this directory.
export AIRFLOW_HOME=/Users/KLoGic/Public/Developer/playground/3rd-project/airflow
nano airflow.cfg

After that. looking for sql_alchemy_conn. you will see something like

sqlite:////Users/KLoGic/airflow/airflow.db

Change it to

mysql://<username>:<password>@<rdshost>:<port>/<dbname>
# example
mysql://admim:123456@airflow.ap-southeast-1.rds.amazonaws.com:3306/airflow

Then run airflow initdb and run airflow on port 8080

airflow initdb
airflow webserver -p 8080

Going to http://localhost:8080 and you will see airflow pages

Airflow admin page

Also view in your RDS Database. if you see airflow tables that auto create like this. you will be good to go.

table that airflow autogenerated

Write code for insert sales order into table sales_order.

Now we will write some code for connect database and create sales_order table. we will connect with database with python packages name sqlalchemy and using variable feature on airflow by going to Admin > Variables > Create and add this variable

host = airflow.ap-southeast-1.rds.amazonaws.com
username: admin
password: 123456
db_name: airflow

After that. we will create connection by this code

from airflow.models import Variable
from sqlalchemy import create_engine
host = Variable.get("host")
db_name = Variable.get("db_name")
username = Variable.get("username")
password = Variable.get("password")
connection = create_engine('mysql://{username}:{password}@{url}/{db_name}?charset=utf8'
.format(username=username, password=password,
url=host, db_name=db_name), echo=False)
conn = connection.connect()
print(conn)

Then we will run this file

python3 connection.py

Result should look something like this

INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=90523
<sqlalchemy.engine.base.Connection object at 0x11176c358>

This mean we can connect to our database successfully. we will going to next step. create sales_order table. we create sales_order_init.py then put this code down

import connection from previous file

from connection import conn

Decleare session and prepare variable

from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
Base = declarative_base()meta = MetaData(conn).reflect()dwhConnection = conn.connect()
SessionDwh = sessionmaker(bind=dwhConnection)
sessionDwh = SessionDwh()

Decleare table

class BaseTable(Base):
__tablename__ = 'sales_order'
entity_id = Column(Integer, autoincrement=True, primary_key=True)
increment_id = Column(
String(20, collation='utf8_general_ci'), unique=True, nullable=False)
amount = Column(DECIMAL(14, 4), nullable=False)
created_at = Column(DateTime(), nullable=False)

function for check table sales_order exist? if not create table. and call this function at the end.

def initSalesTable():
isRun = False
if not conn.dialect.has_table(conn, 'sales_order'):
Base.metadata.create_all(bind=conn)
sessionDwh.commit()
isRun = True
return isRun
initSalesTable()

The complete code will look like this.

from datetime import datetime
from random import randrange
from sqlalchemy import DECIMAL, Column, DateTime, Integer, MetaData, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmaker
from connection import connBase = declarative_base()meta = MetaData(conn).reflect()dwhConnection = conn.connect()
SessionDwh = sessionmaker(bind=dwhConnection)
sessionDwh = SessionDwh()
class BaseTable(Base):
__tablename__ = 'sales_order'
entity_id = Column(Integer, autoincrement=True, primary_key=True)
amount = Column(DECIMAL(14, 4), nullable=False)
created_at = Column(DateTime(), nullable=False)
def initSalesTable():
isRun = False
if not conn.dialect.has_table(conn, 'sales_order'):
Base.metadata.create_all(bind=conn)
sessionDwh.commit()
isRun = True
return isRun
initSalesTable()sessionDwh.close()
dwhConnection.close()

Then run command for create table sales_order.

python3 sales_order_init.py

If we check in database and see table sales_order. this code successsfully running.

Create file for random insert sales_order into table.

We will open sales_order.py and add new function call insertRandomSalesOrder and add this following code.

def insertRandomSalesOrder():
prepareData = []
now = datetime.now()
Base.metadata.create_all(bind=conn)
prepareData.append(BaseTable(
amount=randrange(1, 1000),
created_at=now
))
sessionDwh.add_all(prepareData)
sessionDwh.commit()
return True
initSalesTable()
insertRandomSalesOrder()
sessionDwh.close()
dwhConnection.close()

This will random amount and insert into sales_order table. after we run

python3 sales_order_init.py

Create DAG for insert random sales_order every minutes.

create dags directory and create sales_order.py file

mkdir dags
touch sales_order.py

first we will import everything in header

import sys
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
sys.path.append(
'/Users/KLoGic/Public/Developer/playground/3rd-project/airflow')

The sys.path is a home directory of airflow. for making python know where is the root directory for call file

from sales_order_init import insertRandomSalesOrder

We import insert random function from sales_order_init.py for call it every 1 minutes. then decleare dag

default_args = {
'owner': 'klogic',
'depends_on_past': False,
'start_date': datetime(2019, 8, 1),
'email': ['klogic@hotmail.co.th'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'dag_insert_sales_order', default_args=default_args, schedule_interval="*/1 * * * *")
process_dag = PythonOperator(
task_id='dag_insert_sales_order',
python_callable=insertRandomSalesOrder,
dag=dag)

And complete file should look like this.

import sys
from datetime import datetime, timedelta
from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
sys.path.append(
'/Users/KLoGic/Public/Developer/playground/3rd-project/airflow')
from sales_order_init import insertRandomSalesOrderdefault_args = {
'owner': 'klogic',
'depends_on_past': False,
'start_date': datetime(2019, 8, 1),
'email': ['klogic@hotmail.co.th'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'dag_insert_sales_order', default_args=default_args, schedule_interval="*/1 * * * *")
process_dag = PythonOperator(
task_id='dag_insert_sales_order',
python_callable=insertRandomSalesOrder,
dag=dag)

Default setting will be decleare at default_args. and we create dag with schedule_interval that run every 1 minute. after that we will running this command

# airflow run <name_of_dag_id> <task_id> <date_start_running>
airflow run dag_insert_sales_order dag_insert_sales_order 2019-10-23

The result is it will insert into sales_order table every minute.

UI of DAG. Running Schedule.

If we going to http://localhost:8080 we will see new dag that we create at dashboard.(if you don’t see wait around 5 minutes or change config name dag_dir_list_interval from 300 to 30)

dashboard

We just change setting of dag from Off to On and running this command

airflow scheduler

Then airflow will runing every minute base on our setting in DAG file.

Summary.

It’s convenience way to setting schedule and control everything with airflow. we can create any job that we want and schedule them. it powerful tools that you should try once and it can applier with any task that you need to make sure it running so you can sleep all night long.

I appreciated all cup of coffee. you can donate coffee at button below.

https://www.buymeacoffee.com/klogic

The complete code avaliable here.

Also check my profile and connect me here.

--

--

Narongsak Keawmanee

Software Engineer at Refinitiv • Highly ambitious • Love working with great people • Never Stop Learning