Periodic Snapshot Table Design

Aleksandar Milanovic
8 min readJan 6, 2023

--

If you are working as a Data Engineer, or plan to, you must have heard about Data Warehousing and data modeling concepts that come with it.

When talking about data modeling in Data Warehouse world, we need to distinguish two different types of tables — Dimension & Fact tables.

Dimension tables are managed via SCD (Slowly-Changing Dimension), and they contain detailed information about entities, events etc. On the other hand, Fact tables, contain data about different facts and measures, and depending on type are managed in their own specific way. We have four types of fact tables — Transaction, Periodic Snapshot, Accumulating Snapshot and Factless tables.

Fact tables are connected to dimension tables and they represent the central table in Star or Snowflake schema, and their main use cases are reporting and analytics.

If you wanna learn more or just refresh your knowledge about these concepts, I strongly recommend this Udemy course: https://www.udemy.com/course/data-warehouse-fundamentals-for-beginners/

This course will teach you about all types of tables in Data Warehousing, as well as how to design your ETL pipeline, important things like natural and surrogate keys and so much more.

What are Periodic Snapshot Fact Tables?

As the name suggests, periodic snapshot fact tables are used to store information about measurements over some time period like day, week, month etc.

Procedure would go something like this:

  1. We fill Dimension tables first
  2. We get periodic data from some system(s)
  3. We put that periodic data trough our ETL pipeline and insert new data into Periodic Snapshot fact table

In this article we will focus mostly on how to process this new data and save it to our Data Warehouse.

Example Data Warehouse

In this example we track customers activity on some web portal. Full Data Warehouse schema is given on the image below:

Data Warehouse schema

We have two dimension tables — CUSTOMER_DIM and WEEK_DIM, and our Periodic snapshot fact table CUSTOMER_USAGE_EOW_SNAPSHOT (EOW = End of week).

Let’s say that our customers get maximum of 600 minutes (10 hours) to use portal every week (maybe customers are students who must finish their exercises in given time? I’ll let you decide). We want to track the remaining time for every customer at the end of the week, to see how much our portal is being used.

Customer_Usage_Activity_EOW_Minutes_Remaining column in fact table represents number of remaining minutes customer gets to spend on portal during that week.

This type of measure is called semi-additive measure, because it cannot it’s not additive — we cannot just sum up all remaining minutes from every week and get total, because this value resets to 600 at the beginning of every new week. However, we can perform other numeric operations, calculate things like average remaining minutes per customer across weeks, average remaining minutes for every week or total remaining minutes for specific week. This is why this type of table has “Snapshot” in its name.

You can already see how handy these snapshots can be — for example, we can compare customer’s activity for different weeks, or compare total activity between two weeks and get some insights from it! Pretty cool stuff, right?

You can learn more about these topics from that Udemy course I linked above!

Note: Codebase for this will be available in the link at the end of this article, so don’t worry if you don’t get where some variables and functions come from! ❤

We need to create these tables. We will use Python and Sqlite for this example:

import sqlite3
from constants import DB_LOCATION

wh_connection = sqlite3.connect(DB_LOCATION)

cursor = wh_connection.cursor()

cursor.execute("DROP TABLE IF EXISTS CUSTOMER_DIM")
cursor.execute("DROP TABLE IF EXISTS WEEK_DIM")
cursor.execute("DROP TABLE IF EXISTS CUSTOMER_USAGE_EOW_SNAPSHOT")
cursor.execute("DROP TABLE IF EXISTS STAGING")

customer_dim = """ CREATE TABLE CUSTOMER_DIM (
Customer_Key INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
Customer_Name VARCHAR(30) NOT NULL,
Customer_Email VARCHAR(30) NOT NULL UNIQUE,
Customer_Phone VARCHAR(15) NOT NULL UNIQUE
);"""


# service_dim = """ CREATE TABLE SERVICE_DIM (
# Service_Key INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
# Service_Name VARCHAR(20) NOT NULL UNIQUE,
# Service_Hourly_Price SMALLINT NOT NULL
# );"""


week_dim = """ CREATE TABLE WEEK_DIM (
Week_Key INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
Week_Number TINYINT NOT NULL,
Year SMALLINT NOT NULL
);"""


customer_usage_eow_snapshot = """ CREATE TABLE CUSTOMER_USAGE_EOW_SNAPSHOT (
Customer_Key INTEGER NOT NULL,
Week_Key INTEGER NOT NULL,
Customer_Usage_EOW_Minutes_Remaining SMALLINT NOT NULL,
PRIMARY KEY(Customer_Key, Week_Key),
FOREIGN KEY(Customer_Key) REFERENCES CUSTOMER_DIM(Customer_Key),
FOREIGN KEY(Week_Key) REFERENCES WEEK_DIM(Week_Key)
);"""


staging_table = """ CREATE TABLE STAGING (
Staging_Key INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
Customer_Email VARCHAR(30) NOT NULL,
Week_Number TINYINT NOT NULL,
Week_Activity_Minutes_Remaining SMALLINT NOT NULL
);"""

cursor.execute(customer_dim)
cursor.execute(week_dim)
cursor.execute(customer_usage_eow_snapshot)
cursor.execute(staging_table)

wh_connection.close()

Notices another table called STAGING here — we will come back to this later on.

Our dimensions tables contain some randomly generated or fake data. For customers it would be something like this:

INSERT INTO CUSTOMER_DIM (Customer_Name, Customer_Email, Customer_Phone)
VALUES
("Dennis Gorch", "dennis.gorch88gmail.com", "192837465"),
("Michael Rune", "michael.runes@gmail.com", "987654321"),
("Paul James", "paul.james@joyodev.com", "123456789"),
("Jenny Millton", "jennymillton@yahoo.com", "911234568"),
("Josh Mart", "josh.mart@gmail.com", "411264565");

And for weeks, let’s insert data for all weeks from 2021 and 2022. We can insert all this data with this script to our Data Warehouse:

import sqlite3
from constants import DB_LOCATION


def insert_weeks(cursor):
years = [2021, 2022]
weeks = [*range(1, 54, 1)] # weeks 1-53

sql = ''' INSERT INTO WEEK_DIM (Week_Number, Year) VALUES (?, ?);'''
week_rows = []

for year in years:
for week in weeks:
week_rows.append((week, year))

# insert all weeks for years 2021 and 2022
cursor.executemany(sql, week_rows)


def insert_customers(cursor):
fd = open('sql/customers.sql', 'r')
sql = fd.read()
fd.close()

cursor.executescript(sql)


def insert_dimension_data():
wh_connection = sqlite3.connect(DB_LOCATION)
cursor = wh_connection.cursor()

insert_weeks(cursor)
insert_customers(cursor)

wh_connection.commit()
wh_connection.close()


# insert data for dimension tables
insert_dimension_data()

New data & pipeline

Let’s say that every week we get daily activity for all customers reported via csv file. It would look somethig like this:

Customer_Email,Activity_Minutes,Date
dennis.gorch88gmail.com,40,2022-01-31
dennis.gorch88gmail.com,67,2022-02-01
dennis.gorch88gmail.com,45,2022-02-02
dennis.gorch88gmail.com,10,2022-02-03
dennis.gorch88gmail.com,33,2022-02-04
dennis.gorch88gmail.com,38,2022-02-05
dennis.gorch88gmail.com,35,2022-02-06
michael.runes@gmail.com,31,2022-01-31
michael.runes@gmail.com,71,2022-02-01
michael.runes@gmail.com,55,2022-02-02
michael.runes@gmail.com,22,2022-02-03
michael.runes@gmail.com,44,2022-02-04
michael.runes@gmail.com,22,2022-02-05
michael.runes@gmail.com,56,2022-02-06
paul.james@joyodev.com,19,2022-01-31
paul.james@joyodev.com,9,2022-02-01
paul.james@joyodev.com,25,2022-02-02
paul.james@joyodev.com,16,2022-02-03
paul.james@joyodev.com,49,2022-02-04
paul.james@joyodev.com,39,2022-02-05
paul.james@joyodev.com,25,2022-02-06
jennymillton@yahoo.com,22,2022-01-31
jennymillton@yahoo.com,31,2022-02-01
jennymillton@yahoo.com,33,2022-02-02
jennymillton@yahoo.com,12,2022-02-03
jennymillton@yahoo.com,42,2022-02-04
jennymillton@yahoo.com,20,2022-02-05
jennymillton@yahoo.com,45,2022-02-06

Why did I choose this particular week (January 31st-February 6th)? I don’t know really, it was quite random.

This data needs to be processed and transformed into some form of weekly report in order to be of any significance to us.

We will use PySpark library in order to shape this data for out needs and use it to create rows for periodic snapshot fact table. We start by grouping new data by user (email) and week, thus summing up every daily activity for each user during that specific week. Then we need to subtract that sum from 600 to get remaining minutes for the week for every customer.

After that we write this data to our STAGING table before inserting anything into fact table.

Finally, we need to play with SQL a little bit and get appropriate foreign keys that reference correct data in dimension tables. There is one trick here tho! If you look at our data from csv, you’ll see that we have no data for customer Josh Mart! What now? Well, that’s alright, it only means that Josh was not active that week (probably vacation), but we still need to insert correct data for him to — 600 minutes in this case. In order to do this correctly, we will use CUSTOMER_DIM table as our main driver for this SQL query and LEFT OUTER JOIN on WEEK_DIM table, as well as STAGING table. Join happens on natural keys, email and week number, and we then SELECT the correct surrogate keys from our result — Customer_Key and Week_key. But, (there is always but) we also need to make sure that no duplicates are ever inserted, for this we use WHERE NOT EXISTS clause to make sure no row in fact table has the same Customer_Key and Week_Key combination as this one. Of course, we need to use COALESCE since data in STAGING table will be NULL if user was not present/active during the past week.

In the end, we close our Spark session and delete eveything from STAGING table (data in it is temporary — only used during ETL process).

Full pipeline code:

import findspark
from datetime import datetime
from warehouse_handler import DataWarehouseHandler
from constants import DB_LOCATION, JAR_PATH

findspark.init()

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.functions import sum as _sum
from pyspark.sql.types import IntegerType


# function that gets week number from date (string)
def getWeekNumber(date):
date_obj = datetime.strptime(date, '%Y-%m-%d')
week_num = date_obj.isocalendar().week
return week_num


weekUDF = udf(lambda x: getWeekNumber(x), IntegerType())

# 600 is the maximum minutes that user can spend per week, we subtract total minutes spent from 600 with this UDF
remainingWeekMinutesUDF = udf(lambda x: 600 - x, IntegerType())

# Pipeline main code --------------------------------------------------------------------------------------------------

dw_handler = DataWarehouseHandler(DB_LOCATION)

conf = SparkConf() # create the configuration
conf.set('spark.jars', JAR_PATH)
# set spark.jars

# init spark session
spark = SparkSession.builder \
.config(conf=conf) \
.appName('Customer activity analysis') \
.getOrCreate()

# load daily batch of data that represents user activity on specific day
df = spark.read.options(inferSchema='True', delimiter=',', header='True') \
.csv('./new_data.csv')

df = df.withColumn('Week_Number', weekUDF(col('Date')))

# data should be grouped by customer email and week
group_cols = ['Customer_Email', 'Week_Number']

# sum up all activity for every customer by week
df = df.groupBy(group_cols) \
.agg(_sum('Activity_Minutes').alias('Week_Activity_Minutes')) \
.orderBy('Week_Number')

df = df.withColumn('Week_Activity_Minutes_Remaining', remainingWeekMinutesUDF(col('Week_Activity_Minutes')))
df = df.drop(col('Week_Activity_Minutes')) # this column is not needed anymore after we get the remaining minutes

# df.show()

# write data into staging table
df.write.mode('append').format('jdbc'). \
options(url=f'jdbc:sqlite:{DB_LOCATION}',
driver='org.sqlite.JDBC', dbtable='STAGING', overwrite=True).save()

current_year = datetime.now().year # current year to restrict JOIN on week's natural key
past_week = 58 # 5th week of the year 2022 so csv data is past week's data

# here CUSTOMER_DIM is our main driver for the query, since we want to insert row even if there was no activity
# we want to do LEFT OUTER JOIN on STAGING and WEEK_DIM tables to get proper foreign keys for our fact row
# WHERE NOT EXISTS clause makes sure that we don't insert any duplicates
# only assumption is that no new user is in the new data, that should be covered via SCD before this
sql_query = f"""
INSERT INTO CUSTOMER_USAGE_EOW_SNAPSHOT (Customer_Key, Week_Key, Customer_Usage_EOW_Minutes_Remaining)
SELECT c.Customer_Key,
COALESCE(w.Week_Key, {past_week}),
COALESCE(s.Week_Activity_Minutes_Remaining, 600)
FROM CUSTOMER_DIM c
LEFT OUTER JOIN STAGING s ON
c.Customer_Email = s.Customer_Email
LEFT OUTER JOIN WEEK_DIM w
ON s.Week_Number = w.Week_Number
AND w.Year = {current_year}
WHERE NOT EXISTS (
SELECT 1 FROM CUSTOMER_USAGE_EOW_SNAPSHOT f
WHERE f.Customer_Key = c.Customer_Key
AND f.Week_Key = COALESCE(w.Week_Key, {past_week})
)
;"""

# insert fact rows and delete data from staging table
dw_handler.insert_fact(sql_query)
dw_handler.cleanup()

Bonus: Scheduling pipeline with Apache Airflow

If you want to be even fancier, you can trigger this pipeline as a task in Airflow DAG.

If you are wondering how to use Airflow and Spark together, check this nice article: https://anant.us/blog/modern-business/airflow-and-spark-running-spark-jobs-in-apache-airflow/

Note: Spark package for Airflow needs to be installed separately.

DAG code:

from airflow import DAG
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.operators.python import PythonOperator
from constants import APPLICATION_PATH, JAR_PATH

from datetime import timedelta
import pendulum


def intro():
print('Preparing to insert weekly data into periodic snapshot table...')


default_args = {
'owner': 'Aleksandar Milanovic',
'depends_on_past': False,
'email': ['milanovicalex77@gmail.com'],
}

with DAG(
dag_id='periodic_snapshot_dag',
default_args=default_args,
description='A simple periodic snapshot table example',
schedule_interval=timedelta(days=7),
start_date=pendulum.datetime(2022, 2, 7, tz='UTC'),
catchup=False,
tags=['AleX77NP, periodic snapshot table example'],
) as dag:
intro_job = PythonOperator(
task_id='intro_job',
python_callable=intro,
)
spark_periodic_snapshot_job = SparkSubmitOperator(
task_id='spark_periodic_snapshot_job',
application=f'{APPLICATION_PATH}/pipeline.py',
name='periodic_snapshot',
executor_cores=1,
total_executor_cores=1, # due to nature of sqlite
conn_id='spark_local',
jars=JAR_PATH,
driver_class_path=JAR_PATH
)

intro_job >> spark_periodic_snapshot_job

Conclusion

Big thanks to anyone who reads and finds this article useful! It’s my first, and I would like to hear feedbacks and suggestions.

Last, but not least, here is the full working code as promised: https://github.com/AleX77NP/Periodic-snapshot-table-design

Reach out to me via Linkedin if you have any questions or just want to chat: https://www.linkedin.com/in/aleksandar-milanovi%C4%87-2285bb202/

--

--