Implementing a type 2 slowly changing dimension with Polars and Delta Lake

A simple approach in Python

Jimmy Jensen
7 min readFeb 4, 2024
A blazingly fast polar bear | Photo by Dana Reale on Unsplash

Introduction

With the rising popularity of Delta Lake, and with the blazingly fast dataframe library Polars releasing its version 1.0.0 soon™, I have been working on a project utilising both.

While the documentation provided for both Delta-RS and Polars for Python are superb, I haven’t found many examples of common ETL tasks. It can be daunting to implement a slowly changing dimension of type 2 (SCD2) — and even more so with new tools.

In this post, I’ll show you how it can be achieved with a simplistic approach.

Prerequisites

If you want to follow along, note that each code cell must be executed in order.

In the following example, I’m using Python 3.12, however it should be sufficient to use Python 3.8+

To begin create a folder, make a virtual environment and install polars and deltalake. If you’re on MacOS with a Silicon chip, you should install polars-lts-cpu instead.

mkdir polars-deltalake-scd2 && cd polars-deltalake-scd2
python3 -m venv venv
. ./venv/bin/activate
pip install polars==0.20.6 deltalake==0.15.1

Getting started

Before I can implement a type 2 slowly changing dimension, I first fill the target table with an initial data load. However, a couple of metadata fields are required in order to track changes:

  • is_current: Whether the record has the currently valid information for the specific product code
  • valid_from: The timestamp from which the record was/is current.
  • valid_to: The timestamp until which the record was/is current.
  • is_deleted: Whether the product code no longer exists in the source data.

In the code below, the products are passed into dataframe, and meta columns are added. Then the dataframe is written as a Delta table.

In the product destination table the same product_code can appear multiple times, when the source system changes the values of the columns. A new field product_id is generated as a simple row count. This product_id column is required later when merging the changes into the dimension table and is the primary key of the dimension.

import polars as pl
from datetime import datetime

# Displays all columns
pl.Config().set_tbl_cols(-1)

data = {
'product_code' : ['0001', '0002', '0003', '0004'],
'color' : ['red', 'green','blue','yellow'],
'size': ['small','medium','large','x-large']
}

df = pl.DataFrame(data).with_columns(
[
pl.lit(True).alias('is_current'),
pl.lit(False).alias('is_deleted'),
pl.lit(datetime(1900,1,1,0,0,0,0)).alias('valid_from'),
pl.lit(datetime(9999,12,31,0,0,0,0)).alias('valid_to')
]
)

df = df.with_row_index(
name='product_id',
offset=1
)

df.write_delta(
target='./deltalake/product'
)

target_df = pl.read_delta(
source='./deltalake/product'
)

print(target_df)
Initial data load

After the data has been written to the Delta table, it is read into the target_df variable for use in the later code blocks.

Subsequent data loads

When the initial Delta table is loaded, I can begin loading in with the SCD2 pattern. The goal is to create a set of dataframes which will be upserted into the target table.

Following groups of records needs to be identified:

  • New records
  • Deleted records
  • Updated records

New records

To identify new record with Polars, it is simply performed with an anti join.

Let’s pretend a new product has been added with product_code 0005. Then I can identify the record with the following join operation.

As the product_code has never been in the dimension the meta columns can be set as well.

source_data = {
'product_code' : ['0001', '0002', '0003', '0004','0005'],
'color' : ['red', 'green','blue','yellow','white'],
'size': ['small','medium','large','x-large', 'medium']
}

source_df = pl.DataFrame(source_data)

new_records_df = source_df.join(
other=target_df,
on='product_code',
how='anti'
)

new_records_df = new_records_df.with_columns(
[
pl.lit(True).alias('is_current'),
pl.lit(False).alias('is_deleted'),
pl.lit(datetime(1900,1,1,0,0,0,0)).alias('valid_from'),
pl.lit(datetime(9999,12,31,0,0,0,0)).alias('valid_to')
]
)

print(new_records_df)
Output of new records dataframe

Deleted records

If a record is deleted in the source system, it is a good idea to flag it as such in the SCD2 table.

Identifying it is done almost the same as with the new records — with an anti join, however where the dataframes have reversed roles.

Additionally, I only want to look at current records which are not already marked as deleted. This doesn’t exactly change the output in the end, however it makes the merge-operation lighter as fewer updates are required.

Let’s say product_code 0001 no longer exists in the source data.

source_data = {
'product_code' : ['0002', '0003', '0004','0005'],
'color' : ['green','blue','yellow','white'],
'size': ['medium','large','x-large', 'medium']
}

source_df = pl.DataFrame(source_data)

deleted_records_df = target_df.filter(
pl.col('is_current') == True,
pl.col('is_deleted') == False
).join(
other=source_df,
on='product_code',
how='anti'
)

deleted_records_df = deleted_records_df.with_columns(
pl.lit(True).alias('is_deleted')
)

print(deleted_records_df)
Output of deleted records dataframe

Updated records

When it comes to the updated records, it gets a little more complex. Records which have changed will be identified, and then two versions of the records will be created; one for closing the existing record — overwriting the is_current and valid_to columns, and one version for opening new record with all the new column values from the source data.

In this example, the product with product_code 0003 changed color from blue to teal.

This time I use an inner join to identify product_codes which exist in both the source and the target. Once again, I am not interested in comparing against old versions of the records in the target table, so it is filtered on is_current. After the inner join is performed, the resulting dataframe is filtered to only contain records where any of the source and target columns are different — or if the is_deleted column is true. The latter handles the case where a product code has been re-introduced (un-deleted) in the source system with the same values as the current one.

Note: By default Polars suffixes the joined columns with _right, if there are duplicate column names.

source_data = {
'product_code' : ['0002', '0003', '0004','0005'],
'color' : ['green','teal','yellow','white'],
'size': ['medium','large','x-large', 'medium']
}

source_df = pl.DataFrame(source_data)

updated_records_df = target_df.filter(
pl.col('is_current') == True,
).join(
other=source_df,
on='product_code',
how='inner'
).filter(
pl.any_horizontal(
pl.col('is_deleted') == True,
pl.col('color') != pl.col('color_right'),
pl.col('size') != pl.col('size_right')
)
)

print(updated_records_df)
Output of the updated records dataframe

Closing records

From the updated records I’ll derive records to be closed. That is; the valid_to timestamp is updated to current timestamp, and is_current flag is set to false.

If the source system provides a modified_on timestamp column, this information can be used here. However, in this example I simply use the current timestamp to set the valid_from and valid_to columns.

Additionally, the _right columns are not needed and are dropped.

modification_timestamp = datetime.utcnow()

closed_records_df = updated_records_df.with_columns(
pl.lit(False).alias('is_current'),
pl.lit(modification_timestamp).alias('valid_to')
)

closed_records_df = closed_records_df.drop(
'color_right',
'size_right'
)

print(closed_records_df)
Output of the closed records dataframe

Opened records

For the opened records I can similarly derive them from the updated records dataframe. This time, I take the values from the right dataframe, so the new column values overwrite the left dataframe columns.

After overwriting the left columns, the right columns can be dropped. The product_id column is also dropped, because the record needs a new product_id to not conflict with the existing record — which is closed. The new product_id will be applied in the next step.

opened_records_df = updated_records_df.with_columns(
pl.col('size_right').alias('size'),
pl.col('color_right').alias('color'),
pl.lit(modification_timestamp).alias('valid_from'),
pl.lit(False).alias('is_deleted')
)

opened_records_df = opened_records_df.drop(
'product_id',
'color_right',
'size_right'
)

print(opened_records_df)
Output of the opened records dataframe

Putting it all together

Now that I have gathered all the various cases that can happen to the source data, the dataframes are collected and merged into the target table.

As new_records_df and opened_records_df are entirely new records for the dimension, they need a product_id column. The deleted_records_df and closed_records_df are records in the target table which will be updated and already have the right product_id.

insert_records_df = pl.concat(
[
new_records_df,
opened_records_df
],
how='vertical'
)

# Find the max product_id in the target
product_id_offset = target_df.select(
pl.col('product_id')
).max().item() + 1

insert_records_df = insert_records_df.with_row_index(
name='product_id',
offset=product_id_offset
)

upsert_records_df = pl.concat(
[
insert_records_df,
deleted_records_df,
closed_records_df
],
how='vertical_relaxed'
)

print(upsert_records_df)
Output of the upsert records dataframe

To summarise:

  • A new product with product_code 0005 was added with product_id 5.
  • The product with product_code 0001 was flagged deleted for product_id 1.
  • The product with product_code 0003 changed color from blue to teal. Two records were created for this action:
    - The closed record with product_id 3.
    - The opened record with product_id 6.

The above dataframe represents all updates and inserts required to upsert into the Delta table. The only last step we need is to perform the merge-operation.

upsert_records_df.write_delta(
target='./deltalake/product',
mode='merge',
delta_merge_options={
'predicate': 'source.product_id = target.product_id',
'source_alias': 'source',
'target_alias': 'target'
}
) \
.when_matched_update_all() \
.when_not_matched_insert_all() \
.execute()

updated_delta_table = pl.read_delta(
source='./deltalake/product'
).sort(
pl.col('product_id')
)

print(updated_delta_table)
The delta table after merge

Conclusion

And there you have it.

While keeping the logic relatively simple, there could be more optimal ways to identify the different grouping of records.

In addition, Polars also provide a Lazy API which comes which amongst other benefits does automatic query plan optimisation and predicate pushdown.

However, I felt for educational purposes it was better to keep it in smaller and simpler steps.

--

--