How to perform SCD2 in Databricks using Delta Lake (Python)
What is SCD?
The SCD stands for the slowing changed data. It means it maintains the log of all the records in the database, table, file or on the cloud-based on the requirement.
For example.
There is one execution at 1 am. this execution returns the 3 data. these are data are inserted into the table or any other source. now suppose at 2 am another execution is done and it returns the 5 data. these data are now compared with the old data. For compare, there must be something unique that is required like id, hashcode, etc. Now new records and old records have the same id suppose 1. so now what happens, in the old data having id 1 is mark as deactivate and new data is inserted and mark as active. If there are fresh new data that data are simply inserted.
The SCD2 is nothing but it maintains all the records for tracking purposes and maintains the logs.
BACKGROUND
My Data-bricks notebook does below things:
· Reads data from a JSON file from azure blob storage.
· Store JSON data in the Delta table.
· Perform SCD2 operation using Python in a notebook and store final data in the Master Delta table.
Scenario
In this scenario, there are a total of 3 JSON files on the Azure blob storage which are shown in the below picture.
File 1: Data.json
File 1: Data1.json
File 1: Data2.json
Code
Step 1: Add below namespace for enabling the delta lake.
spark.sql(“set spart.databricks.delta.preview.enabled=true”)
spark.sql(“set spart.databricks.delta.retentionDutationCheck.preview.enabled=false”)
Step 2:
#Add Below Namespaces
from datetime import datetime, timedelta
from pyspark.sql.functions import col,concat,lit,current_date
#declare the date olddate for mark as a invalid record.
yesterdaydaye = datetime.today() — timedelta(days=1)
yesterdaydaye=yesterdaydaye.strftime(“%Y-%m-%d”)
#Create a table in delta table.
spark.sql(“CREATE TABLE TableName(FielName Datatypes) USING DELTA LOCATION ‘Path of DBFS’”)
Step 3:
#Create a Azure blob connection for read file from blob storage.
spark.conf.set(“fs.azure.account.key.<StorageName>.blob.core.windows.net”, “Secrate Key”)
#use folder path for read the files from blob.
path=”wasbs://<ContainerName>@<StorageName>.blob.core.windows.net/FilePath/”
#List of files
filelist=dbutils.fs.ls(path)
Step 4:
#Perform the below operation for storage the data in delta table.
for f in filelist:
if f.path.endswith((‘.json’)):
filename=f.name
data=spark.read.option(“multiline”, “true”).json(f.path)
#Add new column in datafram based on Scenario and HASHCode is concation of the ID+Name+Address
data=data.select(col(“Id”),col(“Name”),col(“Address”),concat(col(“Id”), col(“Name”), col(“Address”)).alias(“HashCode”))
data=data.withColumn(‘ValidFrom’, lit(current_date()))
data=data.withColumn(‘ValidTo’,lit(yesterdaydaye))
#Drop table if Exists and save data to temp table.
spark.sql(“DROP TABLE IF EXISTS SCD2TempTable”)
data.createOrReplaceTempView(“SCD2TempTable”)
#See the data is inserted or not
ShowTable=spark.sql(“select * from SCD2TempTable”).show()
Step 5:Perform SCD2 Operation
#Reaf Data from Master Detla table and temp table where SCD2 is master Delta table
SCD2 =DeltaTable.forPath(spark, “/FileStore/Demp/SCD2”)
updatesDF =table(“SCD2TempTable”)
#Find the row which is going to inster or going to disable the old record and insert the new record.
LatestRecord = updatesDF \
.alias(“NewData”) \
.join(SCD2.toDF().alias(“SCD2”), “Id”) \
.where(“NewData.HashCode <> SCD2.HashCode AND SCD2.ValidTo=’false’”)
stagedUpdates = (
LatestRecord
.selectExpr(“NULL as mergeKey”, “NewData.*”)
.union(updatesDF.selectExpr(“Id as mergeKey”, “*”))
)
#In the above dode the LatestRecord is contain the data which are new based on id and diffrrential by hashcode which is combination of all data row-wise and valid to is cusotm colum for identify that this data is going to insert or not.
SCD2.alias(“SCD2”).merge(
stagedUpdates.alias(“staged_updates”),
“SCD2.Id = mergeKey”)\
.whenMatchedUpdate(
condition = “staged_updates.HashCode <> SCD2.HashCode”,
set = {
“ValidTo”: “staged_updates.ValidTo”
}
).whenNotMatchedInsert(
values = {
“Id”: “staged_updates.Id”,
“Name”: “staged_updates.Name”,
“Address”: “staged_updates.Address”,
“ValidFrom”: “staged_updates.ValidFrom”,
“ValidTo”: “False”,
“HashCode”:”staged_updates.HashCode”
}
).execute()
#In the above code the if the data is match than first it is disable the old record and than update the recrod with new data. if the data is not match than it will simply inster into the master table.
#Finall step: Execute below query for see the data in master table
spark.sql(“select * from SCD2 order by id”).show()
OutPut:
First Execution: If validto is false tham the record is active else deactive.
Second Execution:
Third Execution:
Note: Right now the merge query is only support the string or column name. If you are using a column name then it must be in the data frame.