akubazgi Gebremariam
6 min readSep 24, 2022

Data Warehouse with Airflow, DBT and Postgres

eight locations of the dataset

Introduction

A city traffic department wants to collect traffic data using swarm UAVs (drones) from a number of locations in the city and use the data collected for improving traffic flow in the city and for a number of other tasks. Now the objective is creating a scalable data warehouse that will host the vehicle trajectory data extracted by analyzing footage taken by swarm drones and static roadside cameras.

Data Source

The data can be found in pNEUMA | open-traffic (epfl.ch) and pNEUMA is an open large-scale dataset of naturalistic trajectories of half a million vehicles that have been collected by a one-of-a-kind experiment by a swarm of drones in the congested downtown area of Athens, Greece. Each file for a single (area, date, time) is ~87MB data. The dataset collected form the site is a csv file contains ten columns and 922 records.

Methodology

In this project it is used the Extract Load Transform (ELT) framework method with DBT. The ELT framework helps analytic engineers in the city traffic department setup transformation workflows on the required objective. The transformation is handled using the DBT

Extract Load Transform (ELT)

ELT is the process of extracting data from one or multiple sources and loading it into a target data warehouse. Instead of transforming the data before it’s written, ELT takes advantage of the target system to do the data transformation. Generally, ELT process consists of three steps:

Extraction: This first step involves extracting data from the source as it is or based on predefined rules.

Loading: loading or moving the data based on some pipeline from the source into the target system. The target system could be a data warehouse or data lake.

Transformation: this is done when the data is in the target system. Organizations often transform raw data in different ways for use with different tools or business processes. It also offers more opportunities to mine it for relevant business intelligence in near real-time.

Tech-Stack Flow Diagram

The workflow implemented in this project involves extracting data, loading, transforming and generate report or dashboards with the help of the tools. The diagram is given below:

the workflow to build scalable Datawarehouse

Apache Airflow

helps a workflow manager to schedule tasks, orchestrate and monitor workflows. Directed acyclic graphs (DAG) are used by Airflow to control workflow orchestration. Airflow can run tasks using bash operator, python and Postgres operators. Since Postgres is used as data warehouse, Postgres operators are used in this project to enable interact with the database. I use docker to mount the airflow,

docker-compose up airflow-init

you will see the following after successfully running

finally, you can up the whole container using, docker-compose up and now it is possible to view airflow running if you visit localhost:8080 signing with default username and password as airflow.

Below are scripts of dag file in the airflow

#importing the libraries
import airflow
from datetime import datetime
from datetime import timedelta
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from airflow.utils.dates import days_ago
import pendulum

# configuring default airflow
args={'owner':'ekubay'}

default_args = {
'owner': 'ekubay',
'email': ['axutec14@gmail.com'],
'retries': 1,
'retry_delay': timedelta(minutes=5)
}

# instantiating DAG
dag_psql = DAG(
dag_id = "postgres_Dwh",
default_args=args,
schedule_interval='@daily',
#dagrun_timeout=timedelta(minutes=60),
description='use case of psql operator in airflow',
start_date=datetime(2022,9,10,3)
)
# create_table_sql_query = """
# CREATE TABLE IF NOT EXISTS Trajectory(Id serial primary key, Track_ID TEXT NOT NULL,
# Type varchar(400) NOT null,
# Traveled_Dis varchar(400) NOT null, AVg_Speed TEXT DEFAULT NULL ,Longuited TEXT NOT NULL,
# Latitude TEXT NOT NULL, Speed TEXT NOT NULL,
# Lon_Acc TEXT DEFAULT NULL, Lat_Acc TEXT DEFAULT NULL, Time TEXT DEFAULT NULL);
# """

create_Table = PostgresOperator(
sql ="sql/create_table.sql",
task_id = "create_table_task",
postgres_conn_id = "postgres_local",
dag = dag_psql
)

populate_data= PostgresOperator(
dag=dag_psql,
task_id="populate_data",
sql="sql/insert.sql",
postgres_conn_id="postgres_local",
)

create_Table >> populate_data

The next step is connecting airflow and PgAdmin,

connection between the airflow and postgres database

after running the airflow using docker-compose up, two tasks: create_table_task and populate_data are successfully runnig.

creating and loading data

Postgres

in this project Postgres is used as data warehouse. According to the [wiki](PostgreSQL — Wikipedia) PostgreSQL, also known as Postgres, is a free and open-source relational database management system emphasizing extensibility and SQL compliance. It is used as the primary data store or data warehouse for many webs, mobile, geospatial, and analytics applications.

After the data loaded;

DBT:

dbt stands for data build tool and enables analytics engineers to transform data by simply writing select statements. Dbt handles turning these select statements into tables and views. Dbt does the Transformation part in the ELT process. So that, dbt can help end users or analysts to easily generate various visualizations and insights about each dbt model. I did the following steps to implement dbt. first in my environment, I install dbt-core and dbt-postgres

pip install dbt-core, pip install dbt-postgres

After successfully installed, I use the following to set up the dbt

Initialize the dbt, dbt init to generate sample files/folders.

  • a ~/.dbt/profiles.yml
  • a new folder called dbt_name
  • directories and sample files necessary to get started with dbt

so the following folders and sample scripts are generated

then I have created dbt models some of them are,

## fas_car_dbt_model
{{ config(materialized='view') }}

with source_data as (
select * from trajectory where AVg_Speed > 37 order by Type
)
select *
from source_data
## fast_vehicle_byType
{{ config(materialzied='view')}}

with fast_v as (select * from {{ref('fast_car_dbt_model')}})

SELECT
Type as "Vehicle type",
count(Type) as "vehicle count"
from fast_v
GROUP BY Type ORDER BY "vehicle count" ASC

I used the dbt debug and dbt run command to debug the models, connection to localhost and run it.

Redash

Redash helps to perform analytics on the transformed data to visualize and identify insights. So, I manage to create an instance using the docker compose yaml and finally I connected to my postgres database.

the readsh after connected to the data warehouse

Below are some visualizations

vehicle type vs average speed
vehicle type vs Speed

vehicle distribution,

Challenges Faced Implementing the Project

Challenges faced on this project include:

  • Configuration: I was using windows and I install WSL to enable the docker containers running on Linux distribution. there was some difficulties, but I manage to handle it.

Future Plans

This project is interesting particularly for data engineer career and it has taught me several interesting things, even though it took me several times to manage the communication among the tec stack tools. I want to repeat all the steps with some other data warehouse such as snowflake and other to understand it very well.

Conclusion

The airflow and dbt are interesting tools that is definitely worth giving a try as it may simplify the data ELT pipeline. Particularly in this week project, I have learned how to set up and use airflow, dbt and Postgres with docker to build scalable data warehouse.

Github Link: click here

Reference

ELT: What is Extract, Load, Transform? Free Guide & Examples | Talend

https://livebook.manning.com/book/data-pipelines-with-apache-airflow/chapter-1/v-6/

https://www.snaplogic.com/blog/snowflake-data-platform

Traffic Data Pipeline and Warehouse — TurboFuture

dbt Tutorial (ealizadeh.com)