Apache Airflow. Create ETL pipeline like a boss.
From my last article. Introduction of Airflow. tool for create ETL pipeline. we introduce basic and feature of Apache Airflow. the scheduler task workflow. for this article we will drive a bit deeper. to share the example case how to use Airflow to create ETL pipeline in realword. so let get started.
Requirement
Assume we are ecommerce website. we have customer order every minute and we need to simulate customer ordering with random amount of gmv that customer buying.
What we will doing base on requirement.
We will init apache airflow and connect with RDS databases on AWS. then using mysql as database and we will use Airflow for create random simple sales order on database every minute.
Connect Apache Airflow with RDS database on AWS.
I will skip how to create RDS database real quick. if you want to know about that the official aws document will be good to looking at. after we created RDS database. we will connect it with our Airflow.
We will go to our airflow config and open it
# check path
pwd# get result from pwd export airflow_home to this directory.
export AIRFLOW_HOME=/Users/KLoGic/Public/Developer/playground/3rd-project/airflownano airflow.cfg
After that. looking for sql_alchemy_conn. you will see something like
sqlite:////Users/KLoGic/airflow/airflow.db
Change it to
mysql://<username>:<password>@<rdshost>:<port>/<dbname>
# example
mysql://admim:123456@airflow.ap-southeast-1.rds.amazonaws.com:3306/airflow
Then run airflow initdb and run airflow on port 8080
airflow initdb
airflow webserver -p 8080
Going to http://localhost:8080 and you will see airflow pages
Also view in your RDS Database. if you see airflow tables that auto create like this. you will be good to go.
Write code for insert sales order into table sales_order.
Now we will write some code for connect database and create sales_order table. we will connect with database with python packages name sqlalchemy and using variable feature on airflow by going to Admin > Variables > Create and add this variable
host = airflow.ap-southeast-1.rds.amazonaws.com
username: admin
password: 123456
db_name: airflow
After that. we will create connection by this code
from airflow.models import Variable
from sqlalchemy import create_enginehost = Variable.get("host")
db_name = Variable.get("db_name")
username = Variable.get("username")
password = Variable.get("password")connection = create_engine('mysql://{username}:{password}@{url}/{db_name}?charset=utf8'
.format(username=username, password=password,
url=host, db_name=db_name), echo=False)
conn = connection.connect()
print(conn)
Then we will run this file
python3 connection.py
Result should look something like this
INFO - settings.configure_orm(): Using pool settings. pool_size=5, max_overflow=10, pool_recycle=1800, pid=90523
<sqlalchemy.engine.base.Connection object at 0x11176c358>
This mean we can connect to our database successfully. we will going to next step. create sales_order table. we create sales_order_init.py
then put this code down
import connection from previous file
from connection import conn
Decleare session and prepare variable
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmakerBase = declarative_base()meta = MetaData(conn).reflect()dwhConnection = conn.connect()
SessionDwh = sessionmaker(bind=dwhConnection)
sessionDwh = SessionDwh()
Decleare table
class BaseTable(Base):
__tablename__ = 'sales_order'
entity_id = Column(Integer, autoincrement=True, primary_key=True)
increment_id = Column(
String(20, collation='utf8_general_ci'), unique=True, nullable=False)
amount = Column(DECIMAL(14, 4), nullable=False)
created_at = Column(DateTime(), nullable=False)
function for check table sales_order exist? if not create table. and call this function at the end.
def initSalesTable():
isRun = False
if not conn.dialect.has_table(conn, 'sales_order'):
Base.metadata.create_all(bind=conn)
sessionDwh.commit()
isRun = True
return isRuninitSalesTable()
The complete code will look like this.
from datetime import datetime
from random import randrangefrom sqlalchemy import DECIMAL, Column, DateTime, Integer, MetaData, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship, sessionmakerfrom connection import connBase = declarative_base()meta = MetaData(conn).reflect()dwhConnection = conn.connect()
SessionDwh = sessionmaker(bind=dwhConnection)
sessionDwh = SessionDwh()class BaseTable(Base):
__tablename__ = 'sales_order'
entity_id = Column(Integer, autoincrement=True, primary_key=True)
amount = Column(DECIMAL(14, 4), nullable=False)
created_at = Column(DateTime(), nullable=False)def initSalesTable():
isRun = False
if not conn.dialect.has_table(conn, 'sales_order'):
Base.metadata.create_all(bind=conn)
sessionDwh.commit()
isRun = True
return isRuninitSalesTable()sessionDwh.close()
dwhConnection.close()
Then run command for create table sales_order.
python3 sales_order_init.py
If we check in database and see table sales_order. this code successsfully running.
Create file for random insert sales_order into table.
We will open sales_order.py and add new function call insertRandomSalesOrder and add this following code.
def insertRandomSalesOrder():
prepareData = []
now = datetime.now()
Base.metadata.create_all(bind=conn)
prepareData.append(BaseTable(
amount=randrange(1, 1000),
created_at=now
))
sessionDwh.add_all(prepareData)
sessionDwh.commit()
return TrueinitSalesTable()
insertRandomSalesOrder()sessionDwh.close()
dwhConnection.close()
This will random amount and insert into sales_order table. after we run
python3 sales_order_init.py
Create DAG for insert random sales_order every minutes.
create dags directory and create sales_order.py file
mkdir dags
touch sales_order.py
first we will import everything in header
import sys
from datetime import datetime, timedeltafrom airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperatorsys.path.append(
'/Users/KLoGic/Public/Developer/playground/3rd-project/airflow')
The sys.path is a home directory of airflow. for making python know where is the root directory for call file
from sales_order_init import insertRandomSalesOrder
We import insert random function from sales_order_init.py for call it every 1 minutes. then decleare dag
default_args = {
'owner': 'klogic',
'depends_on_past': False,
'start_date': datetime(2019, 8, 1),
'email': ['klogic@hotmail.co.th'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}dag = DAG(
'dag_insert_sales_order', default_args=default_args, schedule_interval="*/1 * * * *")process_dag = PythonOperator(
task_id='dag_insert_sales_order',
python_callable=insertRandomSalesOrder,
dag=dag)
And complete file should look like this.
import sys
from datetime import datetime, timedeltafrom airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperatorsys.path.append(
'/Users/KLoGic/Public/Developer/playground/3rd-project/airflow')from sales_order_init import insertRandomSalesOrderdefault_args = {
'owner': 'klogic',
'depends_on_past': False,
'start_date': datetime(2019, 8, 1),
'email': ['klogic@hotmail.co.th'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}dag = DAG(
'dag_insert_sales_order', default_args=default_args, schedule_interval="*/1 * * * *")process_dag = PythonOperator(
task_id='dag_insert_sales_order',
python_callable=insertRandomSalesOrder,
dag=dag)
Default setting will be decleare at default_args. and we create dag with schedule_interval that run every 1 minute. after that we will running this command
# airflow run <name_of_dag_id> <task_id> <date_start_running>
airflow run dag_insert_sales_order dag_insert_sales_order 2019-10-23
The result is it will insert into sales_order table every minute.
UI of DAG. Running Schedule.
If we going to http://localhost:8080 we will see new dag that we create at dashboard.(if you don’t see wait around 5 minutes or change config name dag_dir_list_interval from 300 to 30)
We just change setting of dag from Off to On and running this command
airflow scheduler
Then airflow will runing every minute base on our setting in DAG file.
Summary.
It’s convenience way to setting schedule and control everything with airflow. we can create any job that we want and schedule them. it powerful tools that you should try once and it can applier with any task that you need to make sure it running so you can sleep all night long.
I appreciated all cup of coffee. you can donate coffee at button below.
The complete code avaliable here.
Also check my profile and connect me here.