SCD 2 Warehousing Design Pattern on Databricks SQL Serverless

Databricks SQL SME
DBSQL SME Engineering
11 min readOct 15, 2023

How and when to implement the SCD2 Design Pattern on the Lakehouse in DBSQL

Author: Cody Austin Davis

Intro:

Many companies running an enterprise data warehouse have more sensitive requirements that can increase complexity in their data model. One of the most common requirements is the need to have full historical audit capabilities of changing data. If an update to a record happens, many businesses are legally required to be able to track who changed the record, what the data was before and after the change, and when it was changed. This is a common requirement in regulated industries such as healthcare, finance, and cybersecurity. The most popular design pattern in warehousing to do this is called the SCD-2 (slowly changing dimensions) design pattern. If you do not know the specific terminology, it simply means that instead of updating records in place when data changes, we will “expire” the old records, track the time it was created and expired, then insert its replacement, and finally mark the new record as the “active record” for that given row.

In Databricks SQL, we can incrementally read in changes from files or streams and continually insert them into what we call “Bronze tables” in the Lakehouse Medallion architecture. The Lakehouse Medallion architecture is broadly a 3 stage architecture with stages called “Bronze”, “Silver”, and “Gold”.

  • Bronze tables generally mean data that is appended straight from its source without much cleaning into Delta tables. They serve the purpose of keeping data in its source form and staging the data for downstream cleaning.
  • Silver tables generally house the core data models of clean data. It is the “single source of truth” for an enterprise, business unit, or use case. Here, updates have been processed, cleaning rules have been applied, and downstream applications have an expectation of the data quality and availability.
  • Gold tables are generally output tables for specific use cases, typically and the end of the line. They can be business tracking aggregations and metrics, use case specific information, and anything really that end users/apps would interact with.

In this article, we will show an end to end example of how to incrementally read changes (new inserts or updates to existing rows) from a data source, process those changes, properly structure your downstream “silver” layer, and finally create gold-level snapshots for auditing purposes.

The code for this article can be pulled and used in the Lakehouse Tacklebox here.

Lets get started!

Step 1 — Incrementally Ingest Data: At this stage, we are going to run a COPY INTO statement from files to create a Bronze append-only table of data updates. Since downstream changes are all tracked, we can either let this table retain ALL changes, or we can save on storage and delete data from this table as we process batches from it successfully. That more resembles classical staging design patterns. The Raw → Bronze layer looks something like this:

CREATE OR REPLACE TABLE iot_dashboard.bronze_sensors_scd_2
(
Id BIGINT GENERATED BY DEFAULT AS IDENTITY,
device_id INT,
user_id INT,
calories_burnt DECIMAL(10,2),
miles_walked DECIMAL(10,2),
num_steps DECIMAL(10,2),
timestamp TIMESTAMP,
value STRING,
ingest_timestamp TIMESTAMP
)
USING DELTA
-- LOCATION "s3://bucket-name/data_lakehouse/tables/data/bronze/bronze_senors/"
;

Next, we run the COPY INTO statement to read new files incrementally from an S3/ADLS/GCS bucket. In our case, we are going to read from the default dataset that comes with all Databricks workspaces. This dataset contains some Iot data with device and user information. The COPY INTO statement looks like this:

COPY INTO iot_dashboard.bronze_sensors_scd_2
FROM (SELECT
id::bigint AS Id,
device_id::integer AS device_id,
user_id::integer AS user_id,
calories_burnt::decimal(10,2) AS calories_burnt,
miles_walked::decimal(10,2) AS miles_walked,
num_steps::decimal(10,2) AS num_steps,
timestamp::timestamp AS timestamp,
value AS value, -- This is a JSON object,
now() AS ingest_timestamp
FROM "/databricks-datasets/iot-stream/data-device/")
FILEFORMAT = json -- csv, xml, txt, parquet, binary, etc.
COPY_OPTIONS('force'='false') --'true' always loads all data it sees. option to be incremental or always load all files

--Other Helpful copy options:
/*
PATTERN('[A-Za-z0-9].json')
FORMAT_OPTIONS ('ignoreCorruptFiles' = 'true') -- skips bad files for more robust incremental loads
COPY_OPTIONS ('mergeSchema' = 'true')
'ignoreChanges' = 'true' - ENSURE DOWNSTREAM PIPELINE CAN HANDLE DUPLICATE ALREADY PROCESSED RECORDS WITH MERGE/INSERT WHERE NOT EXISTS/Etc.
'ignoreDeletes' = 'true'
*/;

A few things are going on here. First we can specify the type and file path we want to incrementally ingest from. Here we can even use glob path filters. COPY INTO has a very comprehensive list of options that allow you to really customize your ingestion pipelines, but that will be covered in a separate article. COPY INTO documentation can be found here. Next, you’ll notice that you can begin to write SQL directly on your incremental ingestion pipeline. In this example, we actually embed some data type casting and quality expectations directly in the Raw → Bronze layer. This is not as typical but can be great depending on your use case and data. In this case, it simplifies the MERGE command downstream.

The most important thing to note is the addition of the column “ingest_timestamp”. This is what we will use to help implement our downstream SC2 pipeline logic, but an ingest timestamp is a highly recommended practice for all pipelines.

Step 2 — Create your target Silver table: Now we will define the DDL for our target silver table, adding some key columns for this design pattern.

CREATE TABLE IF NOT EXISTS iot_dashboard.silver_sensors_scd_2
(
Id BIGINT GENERATED BY DEFAULT AS IDENTITY,
device_id INT,
user_id INT,
calories_burnt DECIMAL(10,2),
miles_walked DECIMAL(10,2),
num_steps DECIMAL(10,2),
timestamp TIMESTAMP,
value STRING,
ingest_timestamp TIMESTAMP,
-- Processing Columns
_start_timestamp TIMESTAMP,
_end_timestamp TIMESTAMP,
_batch_run_id STRING,
_is_current BOOLEAN
)
USING DELTA
PARTITIONED BY (_is_current, user_id)
TBLPROPERTIES('delta.feature.allowColumnDefaults' = 'supported', 'delta.columnMapping.mode' = 'name')

Here are are adding 4 columns at the end.

_is_current: This is a column that specifies whether or not the given record is the most up-to-date version of that row for a given primary key of a table. True means that it is active, False means that the record has expired and it is a historical version that is terminated. We are partitioning by this column to allow downstream applications to never need to read historical files for optimal operational performance. More on that later.

_start_timestamp: This is the timestamp that the given record starting being “active” in the silver table. It is the timestamp in which it was inserted as the new record or the newly updated record.

_end_timestamp: This is the timestamp that the given record was terminated and “expired”. This will be null for current records.

_batch_run_id: This is not absolutely required, but is highly recommended. In warehouses, especially highly regulated ones, you generally want to track each load to know who loaded the batch, when it happened, and which records were included in a given batch. For this, we add the batch_run_id column to associate each record with a unique batch id for tracking purposes.

Now that we have our table, lets populate it!

Step 3 — Load your staging data with a MERGE: First we can check our staging bronze table for incoming data to load. It should look something like this:

Author screenshot

Now we can write a MERGE statement to perform the SCD 2 Update.

The MERGE statement looks like the following:


-- Step 1 - get state of the active batch

DECLARE OR REPLACE VARIABLE var_batch_id STRING = uuid();

-- Optional intra-batch pre insert/merge de-cup
CREATE OR REPLACE TABLE iot_dashboard.temp_batch_to_insert
AS
WITH de_dup (
SELECT Id::integer,
device_id::integer,
user_id::integer,
calories_burnt::decimal,
miles_walked::decimal,
num_steps::decimal,
timestamp::timestamp,
value::string,
ingest_timestamp,
ROW_NUMBER() OVER(PARTITION BY device_id, user_id, timestamp ORDER BY ingest_timestamp DESC) AS DupRank
FROM iot_dashboard.bronze_sensors_scd_2
)

SELECT Id, device_id, user_id, calories_burnt, miles_walked, num_steps, timestamp, value, ingest_timestamp,
now() AS _start_timestamp,
true AS _is_current,
var_batch_id AS _batch_run_id -- example batch run id
FROM de_dup
WHERE DupRank = 1
;

-- Primary MERGE STATEMENT
-- 1 set of records simply to find and expire existing records
-- other set of records to actually INSERT (NULL merge key will always not match)

-- NOTES:
-- 1. You can be more efficient and filter records that havent actually had data change, but you must make the WHEN MATCHED and WHERE clause statement match or the logic will expire records incorrectly
-- 2. You can have any set of merge keys
MERGE INTO iot_dashboard.silver_sensors_scd_2 AS target
USING (

SELECT updates.Id AS merge_key_id,
updates.user_id AS merge_key_user_id,
updates.device_id AS merge_key_device_id,
updates.* --merge key can be built in whatever way makes sense to get unique rows
FROM iot_dashboard.temp_batch_to_insert AS updates

UNION ALL

-- These rows will INSERT updated rows of existing records and new rows
-- Setting the merge_key to NULL forces these rows to NOT MATCH and be INSERTed.
SELECT
NULL AS merge_key_id,
NULL AS merge_key_user_id,
NULL AS merge_key_device_id,
updates.*
FROM iot_dashboard.temp_batch_to_insert AS updates
INNER JOIN iot_dashboard.silver_sensors_scd_2 as target_table
ON updates.Id = target_table.Id
AND updates.user_id = target_table.user_id
AND updates.device_id = target_table.device_id -- What makes the key unique
-- This needs to be accounted for when deciding to expire existing rows
WHERE updates.value <> target_table.value -- Only update if any of the data has changed

) AS source

ON target.Id = source.merge_key_id
AND target.user_id = source.merge_key_user_id
AND target.device_id = source.merge_key_device_id

WHEN MATCHED AND (target._is_current = true AND target.value <> source.value) THEN
UPDATE SET
target._end_timestamp = source._start_timestamp, -- start of new record is end of old record
target._is_current = false

WHEN NOT MATCHED THEN
INSERT (id, device_id, user_id, calories_burnt, miles_walked, num_steps, value, timestamp, ingest_timestamp, _start_timestamp, _end_timestamp, _is_current, _batch_run_id)
VALUES (
source.id, source.device_id, source.user_id, source.calories_burnt, source.miles_walked, source.num_steps, source.value, source.timestamp,
source.ingest_timestamp,
source._start_timestamp, -- start timestamp -- new records
NULL ,-- end_timestamp
source._is_current, -- is current record
source._batch_run_id --example batch run id
)
;

-- This calculate table stats for all columns to ensure the optimizer can build the best plan
-- THIS IS NOT INCREMENTAL
ANALYZE TABLE iot_dashboard.silver_sensors_scd_2 COMPUTE STATISTICS FOR ALL COLUMNS;

-- THIS IS INCREMENTAL
OPTIMIZE iot_dashboard.silver_sensors_scd_2 ZORDER BY (timestamp, device_id);

-- Truncate bronze batch once successfully loaded
-- If succeeds remove temp table
TRUNCATE TABLE iot_dashboard.temp_batch_to_insert;

Lets break this down. First, we read data from our bronze staging table and de-dup on our staging batch. This step is optional, but common, so we included it to show how to implement it. We then add the _start_timestamp, _is_current, and _batch_run_id columns that show that these records are the new rows on the block (pun intended). We generate a random batch id in this example and store it in a new SQL Variable.

Next, we perform a MERGE. The merge is only to find and expire existing records that will be no longer the “active” records. We merge on whatever the primary key or definition of a “row” is and the _is_current = true constraint. Importantly, we can ensure that the MERGE statement only expires and updates that have actually changed the data with the additional INNER JOIN and WHERE clause in the source table (i.e. updates.value <> target_table.value) This is more performant but you must ensure that this logic is accounted for in the WHEN MATCHED AND logic to ensure the correct existing records are expired. If the MERGE finds a match, then it will update the existing record with the _is_current = False flag and make the _end_timestamp of the record with the _start_timestamp of its replacement record.

Now all that is left to do is insert all the new records! So we run the insert statement next.

Finally, we OPTIMIZE and ZORDER our tables for excellent downstream read performance and file pruning. This is all the more important when dealing with SCD2 style tables, since they tend to have more records than simply updating in place. In our example, we paritioned the table by _is_current and then ZORDER by our attribute that downstream users filter on. This ensures downstream users do not even notice performance hits from the additional historical records still existing in the table. Lastly, we truncate our staging table upon successful batch completion.

Now if we run this pipeline again (to do this, you can set force=’true’ in the COPY INTO statement to reload the already processed records), we can see that the MERGE statement expired the first batch and inserted the second. This enables us to see some interesting metrics like below:

Author screenshot

Step 4 — Create Operational Views for Users and Historical Snapshots for Audits: We did all this extra work to build our SCD2 pipeline, now we can implement the important stuff: operational views and historical snapshots without disrupting operations.

First we can create a “current view” of our iot data for our operational users that don’t need the historical data like so:

CREATE OR REPLACE VIEW iot_dashboard.silver_sensors_current
AS
SELECT * FROM iot_dashboard.silver_sensors_scd_2
WHERE _is_current = true

This will ensure our users always prune the “expired” files so that they wont even notice they are there.

Next, we can create a historical snapshot of our data. This can be done for end of month, end of quarter, EoY reporting, and really anything required for auditing and reporting. We simply just create a table snapshot with the most recent record on or before a certain start date like so:

CREATE OR REPLACE VIEW iot_dashboard.silver_sensors_snapshot_as_of_2023_10_10_19_30_00
AS
-- Get most recent record for each record as of a specific version
WITH de_dup (
SELECT Id::integer,
device_id::integer,
user_id::integer,
calories_burnt::decimal,
miles_walked::decimal,
num_steps::decimal,
timestamp::timestamp,
value::string,
ingest_timestamp,
_start_timestamp,
_is_current,
_end_timestamp,
ROW_NUMBER() OVER(PARTITION BY id ORDER BY _start_timestamp DESC) AS DupRank -- Get most recent record as of a specific point in time
FROM iot_dashboard.silver_sensors_scd_2
-- Point in time snapshot timestamp such as end of month
WHERE _start_timestamp <= '2023-10-10T19:30:00'::timestamp
)

SELECT *
FROM de_dup
WHERE DupRank = 1
;

This gives us the traceability we need with the performance we want on Delta and Databricks SQL. This is especially powerful on Databricks SQL Serverless because these snapshots can be scheduled to build and the warehouse can immediately turn off once complete. This makes for extremely price/performant auditing, especially when operational dashboards only use a subset of data, and the snapshots need to process a large amount of data. Because of this design, operational dashboards can use much smaller warehouses, and the historical snapshots can be built on larger warehouses than are only on during the snapshot process.

Finally, now that we have all the steps in place, we can use Databricks Workflows to tie all the steps together into a fully observable and server-less ETL pipeline.

To create our workflow, we can choose between saving our SQL steps to SQL files, or saving them to Query Objects in DBSQL. The process is pretty much the same for both, but in production it is generally better to run jobs from files in a Git repository. We will go into the workflows tab and select “Create Job” in the top right corner. Then we will create a task for each of our above steps which are:

Workflow Steps:

  1. Create DDL (Bronze/Silver tables)
  2. Load Raw to Bronze Table
  3. Load Silver SCD2 Tables, Optimize, and Clean Batch
  4. Create Operational View
  5. Create Historical Snapshots

For creating each task we will select the “SQL” task type and choose a Databricks SQL Serverless Warehouse to run it on like so:

Author Screenshot: Create a Task

We can then create tasks just like this for all steps to create the full end-to-end SCD2 Pipeline. It should look like this:

Author Screenshot: End to End DAG

Now we have a completely server-less and observable data pipeline in a matter of 10 minutes! This really begins to show the power of unifying the capabilities of building a Lakehouse on Databricks.

As a bonus, if we are building tables on Unity Catalog, we can choose the trigger type for the above workflow we just built. If we receive files for this pipeline inconsistently of infrequently, then adding a schedule for this job would be unnecessary and inefficient. Instead we would create an event-driven trigger type for this workflow. To do that, simply navigate to the right-hand side of the job and select the “Add Trigger” button. In this dropdown, we can easily provide a data source to monitor along with some rate-limiting features to ensure that the job is not too sensitive.

Author Screenshot: Setting up event-driven trigger in Databricks Workflows

Thank you for reading, we hope you found this article helpful. Be on the lookout for more how-to articles and reach out if there is something specific you would like to see from the DBSQL SME team.

--

--

Databricks SQL SME
DBSQL SME Engineering

One stop shop for all technical how-tos, demos, and best practices for building on Databricks SQL