Mastering ETL: Integrating CSV Data Into an OLAP Database with Pandas

I Putu Julianta
Data Engineering Indonesia
7 min readOct 23, 2023
Photo by Mike Benna on Unsplash

ETL (Extract, Transform, and Load) is the process that allows businesses to extract data from various sources, clean and transform it to suit their specific requirements, and subsequently load it into a database for in-depth analysis.

In this article, we will delve into a step-by-step guide on how you can create an effective ETL process using Pandas. We will focus on extracting data from CSV files, a common data storage format, and loading it into an OLAP (Online Analytical Processing) database for enhanced business analysis.

This is a step-by-step tutorial. If you do not wish to follow the tutorial, you can download the file from my GitHub repository.

Let’s begin by gaining a deeper understanding of ETL and exploring how Pandas can assist you in achieving these objectives.

1. Extract

Before we embark on the ETL process, the crucial first step is to prepare the data to be used. Here is the dataset that will be used in this article. You can download it from the link below.

After downloading the file, extract it into a CSV file. Following that, you can perform the extraction process using Pandas. Here is the code used to convert the CSV into a Pandas DataFrame.

Extract from CSV to DataFrame

In the code above, first, extract the CSV file into a DataFrame, and then display a summary of the DataFrame.

2. Transform

During the transformation stage, here are the typical steps I usually perform:

1) Standardize Column Names

Standardizing column names enhances data consistency, simplifies data management, and improves data analysis by ensuring a uniform naming convention across datasets. To change column names in Pandas using df.rename(), here is an example of the code:

Standardize Column Names

2) Removing Duplicate Data

Removing data duplicates improves data accuracy, reduces storage space, and enhances analysis results by eliminating redundant information. To remove duplicate data in Pandas using df.drop_duplicates(), here is an example code:

df = df.drop_duplicates()

3) Dealing with Missing Values

In the ETL process, there are several approaches that can be used to handle missing values, including imputation and deleting data containing missing values. Before handling missing values, let’s check the columns that have missing values.

Handle Missing Value

Columns customer_id have missing values, the reason “customer_id” has missing values is because those transactions were canceled. Therefore, for this case, I have decided to delete the rows where “customer_id” is null. To drop DataFrame using df.drop()

4) Dealing with Unmatched Data

When dealing with incongruent or unmatched data, whether to delete columns or rows depends on the specific situation and your data-cleaning objectives. In this case, it is not related to columns because all columns in this table are important for fact and dimension tables. The focus here is on handling rows due to the presence of canceled transactions and negative order quantity values.

Handle Unmatched Data

5) Data Type Casting

Data Type Casting is necessary to ensure that the data loaded into a data warehouse or database aligns with the expected format and can be effectively used for analysis.

To change the data type in pandas using df.astype() and special case for datetime using pd.to_datetime()

6) Simple Analytics

Simple analytics can be applied during the “Transform” phase to gain insights and perform basic data analysis. For example, do prices fluctuate?

Simple Analytics

Yes, prices fluctuate. With this approach, we can understand how different price values will be stored in the product table.

7) Data Aggregation

One of the primary objectives of data aggregation in the ETL process is to reduce redundant data. In this case, I will sum the quantity if identical transactions occur.

Summing the Data

As in the picture above, you can see the transaction_id 579171 with product_id 22897 occur 2 at the same time. In this case, I group all data by transaction_id, date, product_id, name, price, customer_id, and country using the df.groupby() and then applying the sum method to aggregate the quantities.

As you can see, before the aggregation, I have two data points with quantities of 4 and 2, while after aggregation, I have one data point with a quantity of 6.

8) Enriching Data

And the way to enrich data is by adding new data from existing data. The purpose is to enhance the information obtained and to make better decisions. Commonly the ETL process extracts data from date columns to get deeper insights, such as year, quarter, month, week, date, and even day. Here’s how to create a new column using existing data.

Create New Columns Using Existing Data

3. Load

After the “Transformation” process, the next step is “Load”. The initial step in loading data into the database is to establish a connection. To create a connection, you can use SQLAlchemy. Here’s how to create a connection using SQLAlchemy.

from sqlalchemy import create_engine, text

engine = create_engine("postgresql://postgres:postgres@localhost:5432/etl")

The database used in this article is PostgreSQL, with the username and password set as “postgres,” and the database name is “etl”. For more information on create_engine() and the URI formatting, see the examples below and the SQLAlchemy documentation.

The following is an ERD (Entity-Relationship Diagram) image for the star schema that will be used.

Star Schema

The steps to create a star schema include the following:

1) Create DataFrame for Dimensional Table

The code below explains how to create a new data frame from an existing data frame, remove duplicate data, and create a primary key using the data frame index.

# Create new dataframe customer
df_customer = df[["customer_id"]].drop_duplicates().sort_values(by="customer_id")
df_customer = df_customer.reset_index(drop=True)
df_customer["customer_key"] = df_customer.index

# Create new dataframe trasaction
df_transaction = df[["transaction_id"]].drop_duplicates().sort_values(by="transaction_id")
df_transaction = df_transaction.reset_index(drop=True)
df_transaction["transaction_key"] = df_transaction.index

# Create new dataframe date
df_date = df[["date", "year", "quarter", "month", "week", "day", "day_name"]].drop_duplicates().sort_values(by="date")
df_date = df_date.reset_index(drop=True)
df_date["date_key"] = df_date.index

# Create new dataframe country
df_country = df[["country", "country_id"] ].drop_duplicates().sort_values(by="country_id")
df_country = df_country.reset_index(drop=True)
df_country["country_key"] = df_country.index

# Create new dataframe product
df_product = df[['product_id', "name", "price"]]
unique_product = df_product.drop_duplicates().sort_values(by=["name", "price"])
unique_product = unique_product.reset_index(drop=True)
unique_product["product_key"] = unique_product.index
df_unique_product = pd.merge(df_product, unique_product, on=["product_id", "name", "price"]).sort_values(by=["name", "price"]).drop_duplicates()

2) Create DataFrame for Fact Table

To create a data frame for a fact table, you need to join all the dimensional tables with the original table. Here’s how to join all tables using pandas.

result = pd.merge(df, df_customer, on="customer_id")
result = pd.merge(result, df_transaction, on="transaction_id")
result = pd.merge(result, df_date, on=["date", "year", "quarter", "month", "week", "day", "day_name"])
result = pd.merge(result,df_country, on=["country_id", "country"])
result = pd.merge(result, df_unique_product, on=["product_id", "name", "price"])

3) Create a Dimension Table in the Database

After successfully creating data frames for Dimensional Tables and the Fact Table, the next step is to load the data frames into the database. Here’s how to load data frames into the database.

# Insert customer dataframe to database
df_customer_dim = df_customer.set_index("customer_key")
df_customer_dim.to_sql("customer_dim", con=engine, if_exists="replace")
with engine.connect() as conn:
conn.execute(text("ALTER TABLE customer_dim ADD PRIMARY KEY (customer_key);"))
df_customer_dim.shape

# Insert transaction dataframe to database
df_transaction_dim = df_transaction.set_index("transaction_key")
df_transaction_dim.to_sql("transaction_dim", con=engine, if_exists="replace")
with engine.connect() as conn:
conn.execute(text("ALTER TABLE transaction_dim ADD PRIMARY KEY (transaction_key);"))

# Insert date dataframe to database
df_date_dim = df_date.set_index("date_key")
df_date_dim['date'] = df_date_dim['date'].dt.date
df_date_dim.to_sql("date_dim", con=engine, if_exists="replace")
with engine.connect() as conn:
conn.execute(text("ALTER TABLE date_dim ADD PRIMARY KEY (date_key);"))

# Insert product dataframe to database
df_product_dim = df_unique_product.set_index("product_key")
df_product_dim.to_sql("product_dim", con=engine, if_exists="replace")
with engine.connect() as conn:
conn.execute(text("ALTER TABLE product_dim ADD PRIMARY KEY (product_key);"))

# Insert country dataframe to database
df_country_dim = df_country.set_index("country_key")
df_country_dim.to_sql("country_dim", con=engine, if_exists="replace")
with engine.connect() as conn:
conn.execute(text("ALTER TABLE country_dim ADD PRIMARY KEY (country_key);"))

4) Create a Fact Table in the Database

To create a fact table for a sales fact table only take the necessary columns. Here’s how to create a sales fact table in the database.

df_sales_fact = result[["customer_key", "transaction_key", "date_key", "product_key", "country_key", "quantity"]].sort_values(by="date_key")
df_sales_fact.to_sql("sales_fact", con=engine, if_exists="replace", index=False)

with engine.connect() as conn:
conn.execute(text("""
ALTER TABLE sales_fact
ADD PRIMARY KEY (customer_key, transaction_key, date_key, product_key, country_key);
"""))
conn.execute(text("""
ALTER TABLE sales_fact
ADD CONSTRAINT fk_customer_dim_sales_fact
FOREIGN KEY (customer_key)
REFERENCES customer_dim (customer_key);
"""))
conn.execute(text("""
ALTER TABLE sales_fact
ADD CONSTRAINT fk_transaction_dim_sales_fact
FOREIGN KEY (transaction_key)
REFERENCES transaction_dim (transaction_key);
"""))
conn.execute(text("""
ALTER TABLE sales_fact
ADD CONSTRAINT fk_date_dim_sales_fact
FOREIGN KEY (date_key)
REFERENCES date_dim (date_key);
"""))
conn.execute(text("""
ALTER TABLE sales_fact
ADD CONSTRAINT fk_product_dim_sales_fact
FOREIGN KEY (product_key)
REFERENCES product_dim (product_key);
"""))
conn.execute(text("""
ALTER TABLE sales_fact
ADD CONSTRAINT fk_country_dim_sales_fact
FOREIGN KEY (country_key)
REFERENCES country_dim (country_key);
"""))

Finally, the ETL process is complete. Data has been successfully extracted, transformed, and loaded into the database. To check for its success, you can use PostgreSQL clients such as pgAdmin, DBeaver, or others. Here’s an example of the ERD display after successfully following these steps.

ER in DBeaver

Conclusion

ETL (Extract, Transform, Load) is a process that involves extracting, transforming, and loading data. Typically, the ETL process is used to collect data from various sources and load it into an OLAP database. To perform the ETL process, there are many tools available in Python, and one of them is Pandas.

--

--