A Data Flow POC using Apache Airflow

Denny Chen
Analytics Vidhya
Published in
4 min readAug 20, 2020

A Beautiful Tool Using on Data ETL Adventure.

Photo by Markus Spiske on Unsplash

What is Airflow?

Airflow is a workflow management platform created by Airbnb in 2014. It started as a solution due to increasing complex workflows in Airbnb. Airflow is written in Python and workflows are created via Python scripts. Airflow follows the principle of “Configuration as Code” and use Directed Acyclic Graphs (DAGS) to manage workflow orchestration.

Due to the well tutorial guide in Airflow, I won’t illustrate the installation process here.

As you install Airflow successfully, just run below command to start Airflow.

$ airflow initdb
$ airflow webserver
$ airflow scheduler

Purpose

My purpose is to crawl a company (using Asus as target) stock price from Yahoo finance and insert the result into MongoDB every 10 minutes.

We can see the job in graph view or tree view via Airflow web interface, as the pic shown below.

Graph View
Tree View

The tree view tells us when does this job start and status of each tasks. The first job starts at “2020–08–13T16:00:00+00:00”. The default timezone in Airflow is UTC. This means the actual time I tell this DAG to start running is at “2020–08–14T00:00:00” Taipei time.

You can trigger the DAG using command line or from web interface.

$ airflow trigger_dag <dag_id>

Before starting you dag, you can test your task separated.

$ airflow test <dag_id> <task_id> <execution_date>

Besides that, we can tell task “get_stock_price” and “insert_to_mongo” has dependency relationship. I will talk about this later.

One color stands for one status, below is the color-status matching pic.

Configuration as Code

In this sector, I talk about Airflow DAG configuration. Before talking about this topic, you need to create a folder call dags inside your airflow project. This folder will store all the .py files.

The default dag configuration is shown below. This is the basic settings.

‘provide_context’ is used to pass a set of keyword arguments that can be used in your function.

‘provide_context’ should place in PythonOperator but I place inside default_args.

Airflow use Xcom (Cross communication) to exchange messages and Xcom can be “pushed” or “pulled”.

default_args = {
'owner': 'DennyChen',
'start_date':datetime(2020, 8, 14, 0, 0),
'depends_on_past': True,
#With this set to true, the pipeline won't run if the previous day failed
'email': [''],
'email_on_failure': True,
#upon failure this pipeline will send an email to your email set above
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(seconds = 5),
'provide_context': True,
}
dag = DAG(
'my_first_dags',
default_args = default_args,
description = 'A simple DAG',
schedule_interval= '*/10 * * * *'
)

After this basic configurations, we can create our own jobs. First, we create a bash job. This job just simply print greetings and time.

Because of ‘provide_context’ is placed inside the ‘default_args’, so I need to place **context inside my function in order to fetch environment variable.

def test(**context):
print("Hi, I'm Denny")
time = str(datetime.today())
print("Time: " + time)

Next, we write a small function to get ASUS stock price via Yahoo Finance. At the end of function, I return three values.

def get_asus_stock_price(**context):
r = requests.get('https://finance.yahoo.com/quote/2357.TW%3FP%3DASUS/')
soup = BeautifulSoup(r.text, 'lxml')
org = soup.find('h1', 'D(ib) Fz(18px)').text
print(org)
price = soup.find('span', 'Trsdu(0.3s) Fw(b) Fz(36px) Mb(-4px) D(ib)').text
print(price)
# up_or_down = soup.find('span', 'Trsdu(0.3s) Fw(500) Pstart(10px) Fz(24px)').text
# up_or_down = soup.find('span', {'data-reactid': '33'})
# print(up_or_down)
date = soup.find('span', {'data-reactid':'35'}).text
print(date)
return org, price, date

After I fetch data from Yahoo Finance, I need to insert them to Database. Here, I choose MongoDB.

def insert_to_mongo(**context):
client = pymongo.MongoClient("")

db = client["stock"]
collection = db["stock_price"]
dblist = client.list_database_names()
# dblist = myclient.database_names()
# print(dblist)
if "stock" in dblist:
print("Exist")
org, price, date = context['ti'].xcom_pull(task_ids='get_stock_price')
print(org)
print(price)
print(date)
time = str(datetime.today())
dict = {"Company": org, "Price": price, "Date":date, "Insert_Time":time}
collection.insert_one(dict)

Here I use xcom_pull to get data return from def get_asus_stock_price().

At last, I need to set up each job. python_callable links to the def I just write above.

# allow to call bash commands
bashtask = BashOperator(
task_id = 'bash_task',
bash_command = 'date',
dag = dag,
)
# allow to call python tasks
python_task = PythonOperator(
task_id = 'python_task',
python_callable = test,
dag = dag,
)
dummy_task = DummyOperator(
task_id='dummy_task',
# retries=3,
dag = dag,
)
get_stock_price = PythonOperator(
task_id = 'get_stock_price',
python_callable = get_asus_stock_price,
provide_context=True,
dag = dag,
)
insert_to_mongo=PythonOperator(
task_id='insert_to_mongo',
python_callable=insert_to_mongo,
dag=dag,
)

I mention in the above that get_stock_price and insert_to_mongo has dependency relationship. In Airflow, you can set downstream and upstream

get_stock_price >> insert_to_mongo

Easy, right? or you can use another syntax.

get_stock_price.set_downstream(insert_to_mongo)

You can use downstream or upstream to set up any dependency relationship you want. In Airflow, it’s called Relationship-builder.

Conclusion

This is a small POC of using Airflow to job my crawler and control/monitor my whole data flow of crawler. Different from crontab, Airflow can manage and scheduling offline job and easy to modify. When you need to manage lots of crontabs, you will love Airflow and other similar tools.

In next article, I’m going to show how to manage Hadoop job with other similar tool, such as Apache Oozie or Genie, provided by Netflix. See you next time. If you have any opinions or questions, glad to leave comments below, Thank you.

--

--