Building an End-to-End Data Warehouse Pipeline

Taimoor Muhammad
10 min readJan 5, 2024

--

In the era of big data, the ability to transform raw information into actionable insights is the foundation for informed decision-making. Organizations rely on robust data infrastructure to harness the power of their data, and a key player in this process is the data warehouse. In this blog, we embark on a journey to construct a comprehensive end-to-end data warehouse, unraveling the intricacies from importing raw data to visualizing insights using tools like Python, MongoDB, PostgreSQL, and Tableau.

Source: https://datascientest.com/en/data-warehouse-2

The objective is to construct a data warehouse utilizing Python, MongoDB, and PostgreSQL, incorporating data on complaints against financial institutions and demographic information. This project aims to create a comprehensive repository for analyzing and managing data related to financial complaints and associated demographics.

Data

Complain Dataset

This dataset is from data.gov. This is a csv format dataset with 4M rows. the complexity of this dataset will also give us a chance to perform performance tuning on my code.
Link: https://catalog.data.gov/dataset/consumer-complaint-database

Demographic Data

This dataset is taken from Kaggle. It contains information regarding the demographics of different states. This is a JSON format dataset.
Link: https://www.kaggle.com/datasets/bitrook/us-county-historical-demographics?select=us_county_demographics.json

Project Flow

Before we start, please review the attached project flow. It will help illustrate the steps we’ll take and explain how the work done will bring about a positive impact.

Project Flow

The structure of this blog will resemble the the steps and order of the project flow. So, we will start by loading our JSON data into MongoDB.

MongoDB

In the effort to keep this simple, we will not go over the detailed documentation of connecting MongoDB to Python. So, in simple terms, we will use the combination of MongoDB Compass (to upload data directly from out local platform)and MongoDB Atlas (to get the credentials for establishing a connection with our local Python platform).

Once the data is uploaded in MongoDB Compass, it will look like the image below. Due to storage limitations, only 25.6K records were imported from the 50K.

MongoDB Compass

Python

We’ll be using Python to load and transform the data, applying various transformations such as pivot operations and filtering for a specific time range to ensure accurate results. After this process, the data will seamlessly transfer to PostgreSQL, automatically creating the necessary tables.

Stage 1: Importing Data

import pandas as pd
from pymongo import MongoClient
# CSV
# Import complaints data in csv format
df=pd.read_csv('complaints-2.csv')
# JSON
# Replace 'your_connection_string_here' with your actual MongoDB Atlas connection string
# mongodb+srv://<username>:<password>@<cluster-address>/<database-name>?retryWrites=true&w=majority
connection_string = "mongodb+srv://your_username:your_password@your_cluster_address/your_database_name?retryWrites=true&w=majority"
client = MongoClient(connection_string)
# Replace 'your_database' and 'your_collection' with your actual database and collection names
db = client['your_database'] # 'Project'
collection = db['your_collection'] # 'demographics'

From the code above, here a few things that need to be changed:

  1. Replace the following placeholders:
  • <username>: Your MongoDB username.
  • <password>: Your MongoDB password.
  • <cluster-address>: The address of your MongoDB Atlas cluster.
  • <database-name>: The name of your MongoDB database.

2. Database and Collection: After establishing the connection, the code specifies the MongoDB database and collection to be used. In this example:

  • db = client['Project'] selects the database named 'Project'. Make sure to replace 'your_database' with your actual database name.
  • collection = db['demographics'] selects the collection named 'demographics' within the 'Project' database. Replace 'your_collection' with your actual collection name.

Once this is done, we will select the fields that are required, this is an important step as it improves performance by not reading and importing the entire dataset. The code below selects the required lists.
An example of how it works: unemployment->employed->2011

from pandas import json_normalize

#example template to show how fields are selected
projection_template = {
"state": 1,
"county":1,
"_id": 0,
}
fields = [
"unemployment.employed",
"unemployment.unemployed",
"population_by_age.total.18_over",
"population_by_age.total.65_over",
]
#years for which we want the data
years = range(2011, 2020)
formatted_fields = []

#append the required fields along with the years to get the final names
for field in fields:
for year in years:
formatted_fields.append(f"{field}.{year}")

# Append the formatted fields to the projection_template
for formatted_field in formatted_fields:
projection_template[formatted_field] = 1

# Perform the query with the specified projection
cursor = collection.find({}, projection_template)

# Convert the cursor to a list of dictionaries
queried_data = list(cursor)

#Converting data into a dataframe
df_demo = pd.DataFrame(queried_data)
flat_df = json_normalize(df_demo.to_dict(orient='records'))

#rough copy so that incase of an error, we do not have to import the entire dataset again
df_demo=flat_df.copy()

The final shapes of our data after being imported is:

  1. Complaints: 4310354 rows × 18 columns
  2. Demographics: 25580 rows × 38 columns

Stage 2: Data Transformation

The Exploratory Data Analysis (EDA) process is omitted here for simplicity. However, by adhering to the outlined project flow, you can access a comprehensive and detailed EDA, complete with comments, on my GitHub repository.

Stage 3: Transferring Data

The following code illustrates how to transfer data from Python to PostgreSQL. It’s designed to simplify the process, eliminating the need to create tables beforehand, as the code will handle this automatically. The SQLAlchemy engine is configured with the provided connection parameters, and the script attempts to connect to the database.

import psycopg2 as pg
from sqlalchemy import create_engine
from sqlalchemy.exc import OperationalError

# PostgreSQL connection parameters
username = ''
password = ''
host = ''
port = ''
database = ''

# URL encode the password
encoded_password = quote(password)

# Create the connection string
DATABASE_URI = f'postgresql://{username}:{encoded_password}@{host}:{port}/{database}'

# Create an SQLAlchemy engine
engine = create_engine(DATABASE_URI, pool_size=10, max_overflow=20)

try:
# Attempt to connect to the database
connection = engine.connect()
print("Connected to the database.")
connection.close()
except OperationalError as e:
print(f"Failed to connect to the database. Error: {e}")

# Use the Pandas to_sql function to create the table in the database
df_eda.to_sql('complaints', engine, index=False, if_exists='replace')
df_demo_eda.to_sql('demographics', engine, index=False, if_exists='replace')

# Print a message indicating success
print(f'Data has been transfered successfully.')

For the PostgreSQL connection, you need to fill in the following parameters:

  • username: Your PostgreSQL username.
  • password: Your PostgreSQL password.
  • host: The address where your PostgreSQL database is hosted.
  • port: The port number for the PostgreSQL database.
  • database: The name of the specific database you want to connect to.

df_eda and df_demo_eda are the names of the final dataframes and ‘complaints’ and ‘demographics’ are the name of the tables that will be created in PostgreSQL (pgAdmin).

PostgreSQL

Since our data is already normalized, we will move directly towards creating tables that will follow the structure of our constellation schema, attached below.

Constellation Schema

Before we create tables, we will implement indexes on most columns in the staging tables ‘complaints’ and ‘demographics’, as it will improve performance while adding data in dimensional tables. Similarly, as we create our dimensional and fact tables, we will implement indexes on those too. While it does put pressure on the buffer cache, in our case it will ultimately lead to improved performance.

The code to create the tables, implement indexes, and implement triggers for SCD 2 and 3 can be found on Github.

To go in a little detail, here we will discuss how SCD 2, SCD 3, and delete prevention triggers will be implemented. To try a different approach, we will implement SCD 2 and 3 using trigger functions too. Go through the following code for reference:

--Delete Prevention Trigger
-- Creating a trigger function
CREATE OR REPLACE FUNCTION prevent_delete_issue()
RETURNS TRIGGER AS $$
BEGIN
RAISE EXCEPTION 'Deleting records from issue_dimension is not allowed.';
RETURN NULL;
END;
$$ LANGUAGE plpgsql;
-- Creating a trigger on issue dimension
CREATE TRIGGER prevent_delete_issue_trigger
BEFORE DELETE
ON issue_dimension
FOR EACH ROW
EXECUTE FUNCTION prevent_delete_issue();
--Create trigger on resolution dimension
CREATE TRIGGER prevent_delete_resolution_trigger
BEFORE DELETE
ON resolution_dimension
FOR EACH ROW
EXECUTE FUNCTION prevent_delete_issue();

----------------

--Resolution Dimension--SCD 2--
CREATE OR REPLACE function scd2_issue()
returns trigger
AS $BODY$
BEGIN
update resolution_dimension
set end_date = current_date, active=false
where complain_number=new.complain_number;
return new;
END;
$BODY$
LANGUAGE plpgsql;
-- Create trigger on resolution dimension
CREATE TRIGGER scd2_resolution_trigger
before INSERT ON resolution_dimension
FOR EACH ROW
EXECUTE PROCEDURE scd2_issue();

----------------

-- Issue Dimension--SCD 3--
CREATE OR REPLACE FUNCTION public.scd3_issue_dimension()
RETURNS TRIGGER
AS $BODY$
BEGIN
-- Update the current record's end date and set the historical value
UPDATE issue_dimension
SET effective_date = current_date::DATE,
consumer_consent_old = consumer_consent,
consumer_consent = NEW.consumer_consent
WHERE complain_number = NEW.complain_number
AND OLD.consumer_consent IS DISTINCT FROM NEW.consumer_consent;
RETURN NEW;
END;
$BODY$
LANGUAGE plpgsql;
--Creating a trigger on issue dimension
CREATE TRIGGER scd3_trigger_issue_dimension
BEFORE INSERT ON issue_dimension
FOR EACH ROW
EXECUTE PROCEDURE scd3_issue_dimension();

1. Delete Prevention Trigger for ‘issue_dimension’:
This trigger function, ‘prevent_delete_issue()’, is designed to prevent the deletion of records from the ‘issue_dimension’ and ‘response_dimension’ table. When attempting to delete a record, the trigger raises an exception, signaling that deleting records from these tables is not allowed. This safeguard ensures data integrity by enforcing a restriction on record deletion.

2. SCD 2 Trigger for ‘resolution_dimension’:
When a new record is inserted, this trigger updates the existing records in the dimension table, setting their end_date to the current date and marking them as inactive (‘active=false’). This approach helps maintain historical changes to resolution data, allowing for a comprehensive historical view.

3. SCD 3 Trigger for ‘issue_dimension’:
This trigger is executed before an insert operation. It updates the current record’s end date and captures the historical value of the ‘consumer_consent’ attribute when there is a change. This methodology preserves a historical trail of changes, providing insights into how the ‘consumer_consent’ attribute evolves over time in the ‘issue_dimension’ table.
How does this prevent the new record from being inserted? For that we will use a constraint while adding data. The constraint in a case of clash will not add the new record, however, before that, the trigger will already update the existing record with the new data.

Now that our tables have been created, indexes and triggers are implemented, and SCD conditions are met, we will move on to add the data. While the complete code can be found on Github in the file inserting-data. In the section below, we’ll demonstrate SCD 0, 1, 2, and 3, along with inserting data into our fact tables.

SCD 0
This piece of code will prevent the new record from being added incase of a clash.

--2.Date Dimension--
-- Generate and insert data into date_dimension table
INSERT INTO date_dimension (year, month, day)
SELECT
EXTRACT(YEAR FROM d)::INTEGER,
EXTRACT(MONTH FROM d)::INTEGER,
EXTRACT(DAY FROM d)::INTEGER
FROM generate_series('2000-01-01'::date, '2050-12-31'::date, '1 day'::interval) d
ON CONFLICT (year, month, day) DO NOTHING;

SCD 1
This piece of code will replace the existing record with the new record incase of a clash.

--1. Location Dimension--
-- Inserting initial data with conflict resolution
INSERT INTO location_dimension (state)
SELECT DISTINCT
sq.state
FROM
(SELECT DISTINCT state FROM demographics dd
UNION
SELECT DISTINCT "State" FROM complaints) sq
ON CONFLICT (state) DO UPDATE
SET state = EXCLUDED.state;

SCD 2
In combination with our previously implemented trigger, if there is a clash, the trigger will update the previous record and insert the new one as it is.

INSERT INTO resolution_dimension (complain_number, public_response, response_to_consumer,
start_date, end_date, active)
SELECT
cc."Complaint ID",
COALESCE("Company public response", ''),
COALESCE("Company response to consumer", ''),
current_date, '2099-12-31', true
FROM temp_record cc;

SCD 3
As mentioned above in the SCD 3 description, the trigger will update the existing record with the information from the new record, and the conflict in the code below will prevent the record from being added.

INSERT INTO issue_dimension (complain_number, consumer_complaint_narrative, consumer_consent, submitted_via, effective_date)
SELECT
cc."Complaint ID",
COALESCE("Consumer complaint narrative", ''),
COALESCE("Consumer consent provided?", ''),
COALESCE("Submitted via", ''),
current_date::DATE
FROM temp_record cc
ON CONFLICT (complain_number) DO nothing;

Population Fact

This code inserts data into the population_fact table, which is connected to three dimensional tables. It captures population statistics, joining data from the demographics, year_dimension, and location_dimension tables.

--Inserting data
INSERT INTO population_fact (year_id, location_id, population_over_18, population_over_65, employed_population, unemployed_population)
SELECT
yd.year_id,
ld.location_id,
COALESCE(d."age.total.18_over", 0) as population_over_18, -- Assuming a default value of 0 for NULL
COALESCE(d."age.total.65_over", 0) as population_over_65,
COALESCE(d."unemployment.employed", 0) as employed_population,
COALESCE(d."unemployment.unemployed", 0) as unemployed_population
FROM
demographics d
JOIN
year_dimension yd ON d."year" = yd.year
JOIN
location_dimension ld ON d."state" = ld.state;

Complaint Fact

This code adds data to the complaint_fact table, linking it with different tables. It pulls details from the complaints table and matches them with the date_dimension, company_dimension, category_dimension, and location_dimension.

We use the COALESCE function to handle potential missing values, making sure the connections are solid. The joins might seem a bit tangled because we're aligning product and sub-product details, dealing with state information variations, and correctly associating issues and resolutions.

The condition r.active=true is strategically implemented to filter SCD 2 results in the resolution dimension, refining the precision of data integration.

-- Add Data
INSERT INTO complaint_fact (issue_id, resolution_id,date_id_sent,date_id_received ,category_id, company_id, location_id,
timely_response, consumer_disputed)
SELECT
i.issue_id,
r.resolution_id,
dd.date_id,
ddd.date_id,
cd.category_id,
cod.company_id,
ld.location_id,
cc."Timely response?"::int,
cc."Consumer disputed?"::int
FROM
complaints cc --replace later with the actual table name
JOIN date_dimension dd ON dd.year = EXTRACT(YEAR FROM TO_DATE(cc."Date sent to company", 'YYYY-MM-DD'))
AND dd.month = EXTRACT(MONTH FROM TO_DATE(cc."Date sent to company", 'YYYY-MM-DD'))
AND dd.day = EXTRACT(DAY FROM TO_DATE(cc."Date sent to company", 'YYYY-MM-DD'))
JOIN date_dimension ddd ON ddd.year = EXTRACT(YEAR FROM cc."Date received")
AND ddd.month = EXTRACT(MONTH FROM cc."Date received")
AND ddd.day = EXTRACT(DAY FROM cc."Date received")
JOIN company_dimension cod ON COALESCE(cc."Company", '') = COALESCE(cod.company, '')
JOIN category_dimension cd ON COALESCE(cc."Product", '') = COALESCE(cd.product, '')
AND COALESCE(cc."Sub-product", '') = COALESCE(cd.sub_product, '')
AND COALESCE(cc."Issue", '') = COALESCE(cd.issue, '')
AND COALESCE(cc."Sub-issue", '') = COALESCE(cd.sub_issue, '')
JOIN location_dimension ld ON COALESCE(cc."State", '') = COALESCE(ld.state, '')
JOIN issue_dimension i ON cc."Complaint ID"=i.complain_number
join resolution_dimension r on cc."Complaint ID"=r.complain_number
where r.active=true;

Our data warehouse is up and running! Check out the Entity Relation Diagram (ERD) diagram below — it mirrors our constellation schema perfectly. It’s now ready for all sorts of analysis.

Entity Relation Diagram (ERD)

With the data warehouse set up, we can now move on and connect this with our visualization platform, Tableau.

The visualization steps are not added here, however, to learn how to connect PostgreSQL and Tableau, follow the steps here:

https://help.tableau.com/current/pro/desktop/en-us/examples_postgresql.htm

The complete Python and SQL code is available on my GitHub. Feel free to follow along step-by-step and share any insights, recommendations, or suggestions for code improvement.

--

--

Taimoor Muhammad

Graduate Student at Boston University | Data Scientist | Python | SQL