Delta Live Table with slowly changing dimension 2

Anuj
4 min readJul 26, 2023

--

As industry is growing, Volume of data increased rapidly. Most of the real time data will changes over a periodic interval of time. To maintain historical data data bricks come up with delta live table with SCD2. I recommend reading this Quickstart article that will get you started with DLT

E2E flow of data

Before will start let’s summaries what is SCD2 why do we need it ? what is SCD2?

John doe changed his zip code and added to new row with indicator. To maintain history will need SCD2.

Use case — We are receiving data from Message Queue to Storage account & data will arrive continuously same needs to be processed with some transformations using DLT with SCD2.

Step 1 — This step will help us to understand schema of data which get it from the data frame.

Materialservices=spark.read.option("multiline","true").json('/mnt/databases/ApplicationName/MaterialUpdateService_8f77ad0e-7313–481e-b9d7-baadeed344fd.json')
Materialservices.schema

Output -
Materialservices_schema =
StructType([StructField('Timestamp', StringType(), True),
StructField('VersionNumber', StructType([StructField('Major', LongType(), True), StructField('Minor', LongType(), True)]), True),
StructField('HardIndividualProductId', StringType(), True),
StructField('NodeTemplates', ArrayType(StructType([StructField('Template', StringType(), True)]), True), True),
StructField('Operations', ArrayType(StructType([StructField('PartNumber', StringType(), True)]), True), True),
StructField('Parameters', ArrayType(StructType([StructField('DisplayValue', StringType(), True), StructField('ParameterCode', StringType(), True), StructField('ParameterPart', StringType(), True), StructField('ParameterType', LongType(), True)]), True), True),
StructField('ProductId', StructType([StructField('ChassisNumber', StringType(), True), StructField('ChassisSeries', StringType(), True)]), True),
StructField('SoftwareStructureComponents', ArrayType(StructType([StructField('ArchitectureType', LongType(), True), StructField('PartNumber', StringType(), True), StructField('Type', LongType(), True)]), True), True)])

By seeing the output will get the understading to each fields and datatypes.
Next step add some metadata information & hashvalues.

Step2 — This step will help us to understand how to read data in dlt stream.

import dlt
from pyspark.sql.functions import *

json_path = f"/mnt/fulldatabases/bombsource/2023/06/01/*.json"
schema_checkpoint_path = f"/mnt/fulldatabases/xyz/chkbomb/schema_checkpoint"


@dlt.table(comment="The raw json data")
@dlt.expect("valid_key", "HardIndividualId IS NOT NULL") # Example of data quality check
def gsp_bronze():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.option(
"cloudFiles.schemaLocation",
schema_checkpoint_path,
)
#pathGlobfilter help us to pick only files end with json.
.option("pathGlobfilter", "*.json")
.option("cloudFiles.inferColumnTypes", "true") # Schema inference, replace with explicit schema if desired
#.option("cloudFiles.schema",Materialservices_schema) #Manual Schema
.load(json_path)
.withColumn("FileName", col("_metadata.file_path"))
#hashed_name will be used for scdtype2
.withColumn('hashed_name', hash('BOMTimestamp','BOMVersionNumber',HardIndividualId ,'Parameters','ProductId','SoftwareStructureComponents'))
# Struct column containing the columns which we want to sequence by
.withColumn("sequence_struct", struct([col("BOMTimestamp"), col("BOMVersionNumber.Major"), col("BOMVersionNumber.Minor")]))
)

Step 3: — Previous steps are for Bronze layer. Now will start with Delta live tables to managed SCD2 to track changes in DLT.

from pyspark.sql.column import Column, _to_java_column
from pyspark.sql.types import _parse_datatype_json_string
from pyspark.sql.functions import to_json,col
import pyspark
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Here will create a streaming table called Material which is the target table
# with same structure as read_service.
dlt.create_streaming_table(name="bom")
#Track_history_column_list any changes in hash based on key will be treated as history record.
#This would be treated as sliver or gold layer if no transformation is required apart from scd2.
#Bronze --> silver--> Gold layer all of them can be written in one notebook to get proper lineage.
dlt.apply_changes(
target = "bom",
source = "gsp_bronze",
keys = ["HardIndividualProductId"],
sequence_by = "sequence_struct",
stored_as_scd_type = 2,
track_history_column_list = ["hashed_name"]
#except_column_list = ["sequence_struct", "_rescued_data", "FileName"]
)

Step 4: — Delta live tables will have couple of parameters like sequence by it will help us if same keys with same time appear it will treat based on Timestamp desc & Version & set the start_at & end_at accordingly. Track_history_coulmn_list will help to maintain any historicals changes in row. Keys it will decides based on which column need to maintain the history.

Step 5:- Scheduling of delta live table is easy as compute resources will automatically picked by DLT based on load. Destination location either you can provide external where all enzyme logs will be available or if you leave it blank then it will automatically store in DBFS metastore. It is very important to add configuration enable track history this is responsible to track history data.

Meta store location
Configuration
Pipeline run status

Conclusion — DLT is an excellent fit for semi-structured data like CSV, JSON or XML as they tend to be append-only operations and are processed as they arrive. For this case the data could be processed continuously or on demand via storage events.

I believe we are at the point where we could make data engineering less complex by leveraging the orchestration functionality available from Data bricks. It then becomes a question of how framework we make the processes to apply streamlined processes on a larger number of source entities.

Disclaimer:- This post here represent my personal point of view. Any technical advice or instructions are based on my own personal knowledge and experience. You can follow me over LinkedIn in case any doubts.

References — https://docs.databricks.com/delta-live-tables/cdc.html

--

--