Data Pipeline using Apache Airflow to Import Data from Public API

and its Application for Jakarta Citizens

Grace K Susanto
jakartasmartcity
8 min readOct 14, 2021

--

Written by Grace Susanto, Hansen Wiguna, and Ayu Andika

This is a tutorial to write a data pipeline that imports time-series data from a public API and inserts it into the local database that is scheduled to run daily.

This tutorial is part of my project as a Data Science Trainee at Jakarta Smart City where I am trying to assist the data analyst team in extracting information from data to help solve the city of Jakarta’s problem. My role was creating a data pipeline to extract, transform, and load public API data into a local database.

For this project, I will be using Python to write the script, PostgreSQL as the database, SQLAlchemy as Python SQL toolkit, and Apache Airflow as a platform to manage my workflow and schedule it daily. Further documentation about Apache Airflow can be found on its official documentation page.

Data Description

The data will be using is TomTom API data. TomTom is a location technology company that releases location and traffic-related public APIs. The data that I will be importing is an hourly report of the traffic index in Jakarta, Indonesia. The data originates from this URL endpoint: https://api.midway.tomtom.com/ranking/liveHourly/IDN_jakarta.

The data itself is a rolling 7 days period of time series of hourly data. Meaning when I fetch the data on 7 September at 13:30 PM, I will get the data from 1 September at 14:00 PM to 7 September at 13:00. When I fetch the data the day after, 8 September at 8:30 AM, I will get data from 2 September at 9:00 PM to 8 September 8:00. So I could not just get the data and put it in the csv file, because it will overwrite the previous data. I also could not just append the data, because there would be some overlap.

Overlapped data illustration

Approach

To tackle this overlapped data problem, I will slice the imported data per date and save it in different csv files. Each csv file will end up only having 24 hours worth of data. The data from the csv file will then be exported to the database. Since I will be running the script daily, I will only be saving yesterday’s data. Airflow will make sure that my script runs as scheduled.

The potential problem with this approach will happen when the script fails to run for one day. This will lose us one day's worth of data. This problem can be mitigated by using Airflow’s catchup and execution date macro feature where Airflow will execute the script for any interval that has not been run since the last execution date. For example, if Airflow’s last execution date is 2 days ago and it skips yesterday’s execution, then today, Airflow will run the script twice, once with yesterday’s execution date and once with today’s execution date. So with this feature, as long as we do not skip the execution more than seven days in a row, Airflow will make sure that we do not miss importing any data.

The workflow of my data pipeline to import this data is as follow:

General workflow of the data pipeline

With this in mind, we are ready to start writing our first data pipeline with Apache Airflow. The full code can be found in my Github account: https://github.com/gsusanto/airflow

Our project directory will be as follow:

Setting up environment

First, we will need to install Airflow and PostgreSQL. For this project, I will run both Airflow and PostgreSQL in a Docker container. I followed a tutorial written by Ivan Rezic here to set up an Airflow container with PostgreSQL and LocalExecutor using Docker and Docker Compose. He has a detailed explanation of what he put in his docker-compose.yml file and how it works.

Once you have done following the tutorial, you should have a working Airflow container. Make sure that your container runs well by building and starting the container:

Wait until your Airflow Scheduler and Webserver are running. Then test your Airflow Webserver by going to your local browser and go to HTTP://localhost:8080. You should find a login page. If you follow the tutorial, you will be able to login with user: admin and password: admin1234

There are two directories that we will be using: dags and logs folder. dags folder is where we will put our python script and our DAGs. logs folder is where we can check the logging of our DAG runs.

Writing Python Script

First, we will write a configuration file to store API URL and csv file directory in a config.py file. We can access these variables in another Python file by importing config and calling config.VARIABLE_NAME

Then, we will write the data ingestion script where it will import the data and put it in csv files. First, we fetch data from TomTom API in import_data() function. Then, in transform_data(), we transform the data. Here, we rename and reposition the header, convert timestamps to date and time with Jakarta’s timezone, and add a new column that specifies the day name. Then get_new_data() will slice the data to get only yesterday’s data. And finally, save the data to a csv file with “Tomtom_{date}.csv” as the title in save_new_data_to_csv() function.

Next, we can start writing the script to export data to database. The first thing we need to do is creating a table named tomtom. We will synchronize the column headers in csv file with the column headers in our database.

Then, we need to build a Connection and Tomtom class. The Connection class is used to connect our Python to our PostgreSQL database with SQLAlchemy. Tomtom class will describe our tomtom table in our database with MetaData to be used by SQLAlchemy to create a session with our tomtom table.

Next, we write a script that will export yesterday’s csv’s data into our tomtom table. It will delete data from tomtom table with yesterday’s timestamp to avoid duplicates, then insert data from the csv file.

Writing DAGs

Finally, we will write our DAG script which will tell Airflow scheduler and executor to run our script in certain intervals. The first DAG we will write is a DAG that will run our data migration script once, which will initiate a tomtom table in our database. We use BashOperator to ask Airflow to run a bash script. Our bash script will be a one-line command to run our Python script. We will pass in our connection name that is stored in Airflow Variable into connection flag. Before we can run this script, we will need to initialize an Airflow Variable named “data_dev_connection” later.

Lastly, we will write our main DAG script which will run our tomtom ingestion script and export to db script daily. We can specify the interval under schedule_interval, set the start date as 6 days ago so Airflow can import all 7 days of TomTom data, and set the catchup flag to True. Then we set 2 BashOperators, one for each script. These bash commands will pass in the execution date and we will use Airflow’s ds macro and {{ }} template to fill in the execution date. Airflow will render this file and change the {{ ds }} template to the actual execution date. So, the actual bash command will be “python /opt/airflow/dags/tomtom_ingestion.py — — date 2021–09–20”.

That is all the scripts you need. Now, we are ready to go to our Airflow website at localhost:8080. The first thing we need to setup first is the Airflow Variable to store our connection string to Postgres database. For that, we need to go to Admin > Variable. Then add a new variable with key: “data_dev_connection” and value: “postgresql+psycopg2://airflow:airflow@postgres/airflow”.
The value is actually defined by: ”[dialect]+[driver]://[username:password]@[host:port]/[database]” .

Then, we can go back to our main DAGs page and start running our DAGs. The first DAG that we will run is the data_migration DAG. We can run it by turning on the DAG then refresh it. It should run successfully once. Running this DAG will initialize tomtom table in the database.

Next, we will turn on our main tomtom DAG. Same thing, we will turn it on and run it. Airflow will run it 6 times with different execution date because we set the start date as 6 days ago. This should create our csv files and populate our database with 6 days worth of data.

Dashboard

One direct use of this data is to create a dashboard of traffic congestion in Jakarta. This dashboard can then be used to determine, for example, what is the best time of day to travel to avoid rush hour. Together with my coworker, Mas Hadhari, we use the imported data from TomTom to create:

  • Weekly traffic congestion by time of day in Jakarta
  • Daily average traffic congestion in Jakarta
  • Today’s traffic index by hour in Jakarta
TomTom Dashboard

Conclusion

Besides the dashboard, once we have collected longer-term traffic index data in Jakarta, we can use those data for further analysis. One direct application that leverages this data is to measure the effectiveness of the government-sanctioned public activity restrictions policy. We can do this by comparing the traffic index over time during the public activity restrictions period and also comparing it with the traffic index before and after the social distancing rule period. Further improvement to the data pipeline can also be done by creating a data purging scheduler that deletes old csv files since we have already stored those data in the database.

This article was written by Grace Kartika Susanto (Data Science Trainee), Hansen Wiguna (Business Analyst & Lead Sub-Team), Ayu Andika (Data Analyst) from Jakarta Smart City, Data and Analytics Team. All of the opinion written in this article is personal and didn’t represent Jakarta Smart City or DKI Jakarta Provincial Government point of view.

--

--