Part Two: Data Engineering: Data warehouse Tech stack with PostgreSQL, DBT, Airflow

Ermias Asmare
12 min readMay 4, 2024

--

As discussed on part one of this series, the data provided in csv format is a traffic data from pNEUMA, pNEUMA is an open large-scale dataset of naturalistic trajectories of half a million vehicles that have been collected by a one-of-a-kind experiment by a swarm of drones in the congested downtown area of Athens, Greece. Each file for a single (area, date, time) is ~87MB data.

lets download the data, go to the website HERE and download the data.

Data Description:

HOW IS THE PNEUMA.CSV FILE ORGANIZED?
For each .csv file the following apply:

  • Each row represents the data Of a single vehicle
  • The first 10 columns in the 1st row include the columns’ names
  • The first 4 columns include information about the trajectory like the unique track_id, the type Of vehicle, the distance traveled in
    meters and the average speed Of the vehicle in km/h
  • The last 6 columns are then repeated every 6 columns based on the time frequency. For example, column_5 contains the latitude Of the vehicle at time column_10, and column_ll contains the latitude Of the vehicle at time column _ 16.
  • Speed is in km/h, Longitudinal and Lateral Acceleration in m/sec2 and time in seconds.

How many types of vehicles are available? There are 6 types of vehicles. These are: Car, Taxi, Bus, Medium Vehicle, Heavy Vehicle, Motorcycle.

WHAT IS THE SIZE Of EACH TYPE Of VEHICLE?
The dimensions of the different types of vehicles are given below in meters:

  • Car and Taxi: 5 x 2
  • Medium Vehicle: 5.83 x 2.67
  • Heavy vehicle: 12.5 x 333
  • Bus: 12B x4
  • Motorcycle: 2.5 x 1

To read Details About the data you can Follow the following links for reference: HERE.

Okay from the data description we can understand that the first four columns provide general information about the trajectory, such as unique track ID, vehicle type, distance traveled, and average speed, the remaining fields represent machine reading of the vehicle location, speed and acceleration at a specific time.

There are 6 fields for each timestamp: latitude, longitude, speed, lateral acceleration, longitudinal acceleration and the time of measurements.
If the vehicle was observed over n timestamps, there will be 6*n fields.
Because one vehicle might have been observed over 10 timestamps (corresponding to 60 fields) and another observed over 100 (corresponding to 600 fields).

You can handle this:

  • remove the first 4 fields, as you were doing
  • the remaining fields (remember they should be 6*n fields, for some value of n) you can organize them in a table with 6 columns and n rows.
  • Don’t forget to attach the track_id to keep track to which vehicle the data is about.

def split_csv_to_dataframes(data_file_path, track_file_path, trajectory_file_path):
"""
Reads a CSV file, splits it into two DataFrames, and writes them to separate CSV files.

Args:
data_file_path (str): Path to the CSV file.
track_file_path (str): Path to save the vehicle data DataFrame (track_info.csv).
trajectory_file_path (str): Path to save the trajectory data DataFrame (trajectory_info.csv).
"""

with open(data_file_path, 'r') as file:
lines = file.readlines()

cols = lines[0].strip('\n').strip().strip(';').split(';')
track_cols = cols[:4]
trajectory_cols = ['track_id'] + cols[4:]

track_info = []
trajectory_info = []

for row in lines[1:]:
row_values = row.strip('\n').strip().strip(';').split(';')
track_id = row_values[0]

# Track data
track_info.append(row_values[:4])

# Trajectory data
remaining_values = row_values[4:]
trajectory_matrix = [
[track_id] + remaining_values[i:i + 6]
for i in range(0, len(remaining_values), 6)
]
trajectory_info.extend(trajectory_matrix)

df_track = pd.DataFrame(data=track_info, columns=track_cols)
df_trajectory = pd.DataFrame(data=trajectory_info, columns=trajectory_cols)

df_track.to_csv(track_file_path, index=False)
df_trajectory.to_csv(trajectory_file_path, index=False)

you can run the function by following the usage below:

df_track = pd.DataFrame(data=track_info, columns=track_cols)
df_trajectory = pd.DataFrame(data=trajectory_info, columns=trajectory_cols)
from src.load_clean_data.load_traffic_data import split_csv_to_dataframes
# Example usage
data_file_path = "../../data/data1.csv"
track_file_path = "../../data/track_info.csv"
trajectory_file_path = "../../data/trajectory_info.csv"

split_csv_to_dataframes(data_file_path, track_file_path, trajectory_file_path)

print(f"DataFrames written to: \n - {track_file_path} \n - {trajectory_file_path}")
df_track.to_csv(track_file_path, index=False)
df_trajectory.to_csv(trajectory_file_path, index=False)

Out Put:

okay now that we have the dataset as we want it, we can start ETL Processes with dbt. the first task in the ETL process is Data Modeling.

Data Modeling

we have to define the data model for the traffic data warehouse. This includes identifying the dimensions, facts, and relationships between different entities in the data. we will focus on normalizing the data to ensure efficient storage and retrieval.

lets get started, by creating SQL scripts to define tables in the data warehouse. We’ll have two tables: one for track information and another for trajectory information.

models/track_info.sql

-- Create track_info table
CREATE TABLE track_info (
track_id VARCHAR(255) PRIMARY KEY,
type VARCHAR(50),
traveled_distance FLOAT,
average_speed FLOAT
);

The SQL script create a table named track_info with columns for track_id, type of vehicle, traveled_distance, and average_speed. The track_id serves as the primary key to uniquely identify each track.

models/trajectory_info.sql

-- Create trajectory_info table
CREATE TABLE trajectory_info (
track_id VARCHAR(255) REFERENCES track_info(track_id),
lat FLOAT,
lon FLOAT,
speed FLOAT,
lon_acc FLOAT,
lat_acc FLOAT,
time FLOAT
);

the above SQL script creates a table named trajectory_info with columns for track_id (foreign key referencing track_info table), lat, lon, speed, lon_acc, lat_acc, and time. This table stores detailed trajectory information for each track.

before running the script and loading the data to the data lets clean, transform the data.

lets create two function in the load_traffic_data file that loads the data from the “traffic_info.csv” and “trajectory_info.csv

def load_traffic_info_data():
df_track_info = pd.read_csv("../../data/track_info.csv")
return df_track_info

def load_trajectory_info_data():
df_trajectory_info = pd.read_csv("../../data/trajectory_info.csv")
return df_trajectory_info

lets go to the “view_data.ipynb” to visualize the data from the csv files.

from src.load_clean_data.load_traffic_data import load_traffic_info_data, load_trajectory_info_data

df_track_info = load_traffic_info_data()
df_track_info.head()
df_trajectory_info = load_trajectory_info_data()
df_trajectory_info.head()

okay now lets create a new file inside the src, load_clean_data folder called clean_data.py, where we write a function that cleans the traffic_info and trajectory_info.

src/load_clean_data/clean_data

import pandas as pd

def clean_track_info(track_info_df):

# Check for duplicated rows based on all columns
duplicated_rows = track_info_df[track_info_df.duplicated()]

# If there are any duplicated rows, display them
if not duplicated_rows.empty:
print("Duplicated Rows:")
print(duplicated_rows)
else:
print("No duplicated rows found.")

# Remove duplicates
cleaned_track_info_df = track_info_df.drop_duplicates()

# Display the cleaned DataFrame without duplicates
print("DataFrame after removing duplicates:")
print(cleaned_track_info_df)

# Handle missing values
cleaned_track_info_df['traveled_d'].fillna(0, inplace=True)
cleaned_track_info_df['avg_speed'].fillna(0, inplace=True)

return cleaned_track_info_df


def clean_trajectory_info(trajectory_info_df):
# Handle missing values and filter out irrelevant data
cleaned_trajectory_info_df = trajectory_info_df.dropna(subset=['lat', 'lon'])

return cleaned_trajectory_info_df

okay you can test the function by running them either in your notebooks or creating a unit test for them.

Next, we are going to work on the dependency management.

Dependency Management: defines the dependencies between dbt models by specifying the order in which models should be executed to ensure that upstream transformations are applied before downstream ones.

# File: models/schema.yml

version: 2

models:
- name: track_info_model
description: Model for track information
columns:
- name: track_id
description: Unique identifier for the track
tests:
- unique
- name: type
description: Type of vehicle
- name: traveled_distance
description: Distance traveled by the vehicle
- name: average_speed
description: Average speed of the vehicle
- name: trajectory_info_model
description: Model for trajectory information
columns:
- name: track_id
description: Unique identifier for the track
tests:
- unique
- name: lat
description: Latitude of the vehicle
- name: lon
description: Longitude of the vehicle
- name: speed
description: Speed of the vehicle
- name: longitudinal_acceleration
description: Longitudinal acceleration of the vehicle
- name: lateral_acceleration
description: Lateral acceleration of the vehicle
- name: time
description: Time of measurement
  • Each model is described along with its columns and any necessary tests. These steps define the dbt models and manage their dependencies to ensure a structured and orderly transformation process.

Next we are gone set up our dbt project, lets start by creating a directory for our dbt project. we can place this directory alongside your other project directories like dags, data, etc. in our case our project folder structure should look something like this. (so far)

project_root/
├── dags/
├── data/
│ └── data1.csv
│ └── track_info.csv
| └── trajectory_info.csv
├── redash_config.json
├── models/
│ └── schema.yml
| └── track_info.sql
| └── trajectory_info.sql
├── src/
| └── load_clean_data/
| └── clean_data.py
| └── load_traffic_data.py
| └── notebooks
| └── read_data.ipynb
| └── view_data.ipynb
├── .gitignore
├── dbt_project.yml
├── airflow.cfg
├── docker-compose.yml
├── profiles.yml
├── README.md
└── requirements.txt

Initialize dbt project by using the command below:

docker-compose run dbt init traffic_dbt_project

output

okay now lets start configuring the connection details for our PostgreSQL database in our profile.yml. This file typically resides in the ~/.dbt/ directory, but since we're using Docker, we'll configure it in the project directory.

profiles.yml

traffic_dbt_project:
target: dev
outputs:
dev:
type: postgres
host: postgres
port: 5432
user: username
password: password
dbname: database_name
schema: public
threads: 4
keepalives_idle: 0

Finally edit the dbt_project.yml file to configure settings for your dbt project, such as your target profile and models directory.

name: 'traffic_dbt_project'
version: '1.0.0'
config-version: 2

profile: 'traffic_dbt_project'

source-paths: ["models"]

That’s it! You’ve set up your dbt project. we can now write SQL queries in your model files to transform your raw data into structured tables.

Airflow DAG Setup

What is Airflow?

Airflow is an open-source platform used to orchestrate complex workflows and data pipelines. It allows users to define, schedule, and monitor workflows as Directed Acyclic Graphs (DAGs). With Airflow, users can define tasks, dependencies between tasks, and schedule when tasks should be executed.

What is DAGs (Directed Acyclic Graph)?

A DAG is a collection of tasks with directed edges between them, where the direction signifies the flow of data or dependencies.

  • DAGs ensure that tasks are executed in a specific order, with each task depending on the successful completion of its prerequisite tasks.
  • DAGs in Airflow are defined using Python code, typically within a Python script or module.

Real-world scenario example: Let’s consider a typical data processing workflow for an e-commerce platform:

Scenario: The e-commerce platform needs to ingest, process, and analyze sales data daily.

Airflow DAG:

  • DAG Definition: An Airflow DAG named 'sales_data_processing' is created to manage the workflow.
  • Tasks: The DAG consists of tasks like 'ingest_data', 'clean_data', 'aggregate_data', and 'generate_reports'.
  • Dependencies: The tasks are connected with dependencies, ensuring that 'clean_data' runs only after 'ingest_data', 'aggregate_data' runs after 'clean_data', and so on.
  • Schedule: The DAG is scheduled to run daily (schedule_interval='@daily'), ensuring that the entire workflow is executed once every day.

Okay now that we have basic understanding of what airflow and DAGs are and their purpose in the data engineering process lets get started to the task at hand.

Create a DAG file:

  • Create a Python file traffic_data_ingestion.py within our dags directory.
  • Install necessary libraries:
pip install apache-airflow
  • Import necessary libraries:
from airflow import DAG
from airflow.exceptions import AirflowException
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
import os
  • Define the DAG with default appropriate arguments:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 5, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
  • create new DAG named 'traffic_data_ingestion'. It specifies the start date and schedule interval for the DAG.
def check_data_exists(**kwargs):
data_file_path = '../data/data_1.csv'
if os.path.exists(data_file_path):
return True
else:
raise AirflowException(f"Data file not found at: {data_file_path}. Please check the path and ensure the data is available.")

with DAG(
dag_id='traffic_data_ingestion',
default_args=default_args,
description='Orchestrate dbt workflow',
schedule_interval='@daily', # Schedule the DAG to run daily
) as dag:

# Task to check if data file exists
check_data_exists_task = PythonOperator(
task_id='check_data_exists',
python_callable=check_data_exists,
provide_context=True,
)

# Task to load data into PostgreSQL
load_data_task = BashOperator(
task_id='load_data',
bash_command='psql -h postgres -U postgres -d traffic_data_warehouse -c "COPY traffic_data FROM \'/data/data_1.csv\' WITH (HEADER TRUE, DELIMITER \',\');"',
# Adding retry delays and retries in case of failure
retries=3,
retry_delay=timedelta(seconds=30),
)

# Set task dependencies
check_data_exists_task >> load_data_task

Explanation:

  • On the snippet above we have created a function named “check_data_exists” that checks if our downloaded csv file which in my case is “data_1.csv”. exist in the directory specified by us.
  • Then we created a dag task that executes the check_data_exist function
  • Finally we create a dag task that load the data to our PostgreSQL database which in my case my database name is traffic_data_warehouse.
  • then we specified the task dependency meaning the sequence in which the task can be executed in our case the check_data_exists_task should run first before loading_data_task runs. I think the WHY is clear on this.

lets create another dag to define the airflow Dags, lets name it “traffic_data_pipeline.py”.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import os

default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2024, 5, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

dag = DAG(
'traffic_data_pipeline',
default_args=default_args,
description='Orchestrate dbt workflow',
schedule_interval='@daily', # Schedule the DAG to run daily
)

# Function to execute dbt transformation
def run_dbt_transformation(**kwargs):
command = 'dbt run --profiles-dir ./ --project-dir ./'
os.system(command)

# Define PythonOperator to execute dbt transformation
run_dbt_task = PythonOperator(
task_id='run_dbt_transformation',
python_callable=run_dbt_transformation,
provide_context=True,
dag=dag,
)

# Set task dependencies
# Set task dependencies
run_dbt_task >> load_data_task

we’ve defined an Airflow DAG named traffic_data_pipeline with a daily schedule. Inside the DAG, there's a PythonOperator run_dbt_task that executes a dbt transformation using a Python function run_dbt_transformation.

lets work on scheduling for dags, we need to make sure that the DAG is scheduled to run at regular intervals. Open the Airflow configuration file airflow.cfg section looks like this:

[core]
# The home folder for airflow, default is ~/airflow
airflow_home = /usr/local/airflow

# The executor class that airflow should use. Choices include SequentialExecutor, LocalExecutor, CeleryExecutor
executor = LocalExecutor

# Secret key to save connection passwords in the db
fernet_key = generate_your_own_fernet_key

# Whether to load the examples that ship with Airflow. It's good to turn this off in production.
load_examples = False

# The maximum number of parallel task instances that should run per DAG.
parallelism = 32

# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16

# The maximum number of active DAG runs per DAG
max_active_runs_per_dag = 16

# The folder where your Airflow plugins are stored
plugins_folder = /usr/local/airflow/plugins

# The folder where your Airflow DAG definitions are stored
dags_folder = /usr/local/airflow/dags

# The class to use for running task instances in a subprocess
task_runner = StandardTaskRunner

# The Secret Key to save connection passwords in the db
secret_key = c165e55bbcf7fee630cd8d95621b2d29b6e54c2ac8dff214f9f7d8caa66a

# Whether to disable the web server's protection against Cross-Site Request Forgery (CSRF).
disable_csrf_protection = True

# Whether to use the default Airflow login and authentication.
authenticate = True

# How long before timing out a python file import while filling the DagBag.
dagbag_import_timeout = 30

# The amount of time to wait between polling for new task instances to schedule.
scheduler_health_check_threshold = 30

# The SQLAlchemy connection string to the metadata database.
sql_alchemy_conn = postgresql://postgres:password@postgres/databse_name

[webserver]
# The ip specified when starting the web server
web_server_host = 0.0.0.0

# The port on which to run the web server
web_server_port = 8080

# The secret key used to run your flask app
secret_key = your_secret_key

# Number of worker processes for the Gunicorn web server
workers = 4

# Whether to use the Gunicorn web server
worker_class = gthread

# The number of threads to use to process requests in the Gunicorn web server
worker_threads = 4

# The maximum number of simultaneous requests that can be handled by the Gunicorn web server
worker_connections = 1000

# The timeout for waiting for requests in the Gunicorn web server, in seconds
timeout = 120

[scheduler]
# The scheduler used to trigger tasks and manage the task lifecycle.
scheduler_job_heartbeat_sec = 5

# The frequency at which the scheduler should wake up and attempt to schedule jobs.
scheduler_heartbeat_sec = 10

# The class used to run the scheduler process.
job_heartbeat_sec = 1

# Defines the number of seconds after the scheduler considers a task as failed.
scheduler_zombie_task_threshold = 300

# The class used to send heartbeats from the scheduler to the metastore.
scheduler_zombie_slayer_cycle = 1

# Whether the scheduler should try to trigger the scheduler_loop when there are no tasks scheduled.
scheduler_triggerer_task_wait_threshold = 2

# The number of seconds to wait before shutting down the scheduler after receiving a SIGTERM signal.
scheduler_shutdown_wait_sec = 2

# The number of seconds to wait for the dag file import to complete before timing out and raising an exception.
dag_dir_list_interval = 300

Okay now lets work on integration with Redash.

Redash Integration

Ensure your Redash container is up and running. You can verify this by running docker ps in your terminal. Access the Redash UI at http://localhost:5000 in your web browser.

Database Connection:

  • Log in to Redash using the default credentials (email: admin@redash.io, password: admin).
  • Navigate to the “Data Sources” page.
  • Click on “New Data Source” and select “PostgreSQL” as the data source type.
  • Enter the following details to configure the connection:
  • Name: Enter a name for the connection (e.g., Traffic Data Warehouse).
  • Host: postgres
  • Port: 5432
  • User: postgres
  • Password: 8040
  • Database Name: traffic_data_warehouse
  • Leave other fields as default or adjust according to your setup.
  • Click “Save” to save the data source configuration.

Create Dashboards:

  • Once the data source is configured, you can start creating dashboards and visualizations.
  • Click on “Dashboards” in the navigation menu and then click “New Dashboard” to create a new dashboard.
  • Add visualizations to the dashboard by clicking on the “+” icon and selecting the desired visualization type (e.g., line chart, bar chart, table).
  • Configure each visualization to query the necessary data from your data warehouse.
  • Use SQL queries or the query builder provided by Redash to create custom queries that retrieve data from your structured tables.
  • Customize the visualizations by adjusting settings such as axes labels, colors, and legends.
  • Organize the visualizations on the dashboard to present the data effectively.

--

--