How to perform SCD2 in Databricks using Delta Lake (Python)

Mayur Panchal
Analytics Vidhya
Published in
4 min readDec 27, 2019
Photo by Mika Baumeister on Unsplash

What is SCD?

The SCD stands for the slowing changed data. It means it maintains the log of all the records in the database, table, file or on the cloud-based on the requirement.

For example.

There is one execution at 1 am. this execution returns the 3 data. these are data are inserted into the table or any other source. now suppose at 2 am another execution is done and it returns the 5 data. these data are now compared with the old data. For compare, there must be something unique that is required like id, hashcode, etc. Now new records and old records have the same id suppose 1. so now what happens, in the old data having id 1 is mark as deactivate and new data is inserted and mark as active. If there are fresh new data that data are simply inserted.

The SCD2 is nothing but it maintains all the records for tracking purposes and maintains the logs.

BACKGROUND

My Data-bricks notebook does below things:

· Reads data from a JSON file from azure blob storage.

· Store JSON data in the Delta table.

· Perform SCD2 operation using Python in a notebook and store final data in the Master Delta table.

Scenario

In this scenario, there are a total of 3 JSON files on the Azure blob storage which are shown in the below picture.

File 1: Data.json

First-time execution these data are inserted in the delta table

File 1: Data1.json

Second-time execution the old id 1 is disabled and new id 1 is enable

File 1: Data2.json

third execution you can find out what is going to happen.

Code

Step 1: Add below namespace for enabling the delta lake.

spark.sql(“set spart.databricks.delta.preview.enabled=true”)
spark.sql(“set spart.databricks.delta.retentionDutationCheck.preview.enabled=false”)

Step 2:

#Add Below Namespaces

from datetime import datetime, timedelta
from pyspark.sql.functions import col,concat,lit,current_date

#declare the date olddate for mark as a invalid record.

yesterdaydaye = datetime.today() — timedelta(days=1)
yesterdaydaye=yesterdaydaye.strftime(“%Y-%m-%d”)

#Create a table in delta table.

spark.sql(“CREATE TABLE TableName(FielName Datatypes) USING DELTA LOCATION ‘Path of DBFS’”)

Step 3:

#Create a Azure blob connection for read file from blob storage.

spark.conf.set(“fs.azure.account.key.<StorageName>.blob.core.windows.net”, “Secrate Key”)

#use folder path for read the files from blob.
path=”wasbs://<ContainerName>@<StorageName>.blob.core.windows.net/FilePath/”

#List of files
filelist=dbutils.fs.ls(path)

Step 4:

#Perform the below operation for storage the data in delta table.

for f in filelist:
if f.path.endswith((‘.json’)):
filename=f.name
data=spark.read.option(“multiline”, “true”).json(f.path)

#Add new column in datafram based on Scenario and HASHCode is concation of the ID+Name+Address
data=data.select(col(“Id”),col(“Name”),col(“Address”),concat(col(“Id”), col(“Name”), col(“Address”)).alias(“HashCode”))
data=data.withColumn(‘ValidFrom’, lit(current_date()))
data=data.withColumn(‘ValidTo’,lit(yesterdaydaye))

#Drop table if Exists and save data to temp table.
spark.sql(“DROP TABLE IF EXISTS SCD2TempTable”)
data.createOrReplaceTempView(“SCD2TempTable”)

#See the data is inserted or not
ShowTable=spark.sql(“select * from SCD2TempTable”).show()

Step 5:Perform SCD2 Operation

#Reaf Data from Master Detla table and temp table where SCD2 is master Delta table

SCD2 =DeltaTable.forPath(spark, “/FileStore/Demp/SCD2”)
updatesDF =table(“SCD2TempTable”)

#Find the row which is going to inster or going to disable the old record and insert the new record.

LatestRecord = updatesDF \
.alias(“NewData”) \
.join(SCD2.toDF().alias(“SCD2”), “Id”) \
.where(“NewData.HashCode <> SCD2.HashCode AND SCD2.ValidTo=’false’”)
stagedUpdates = (
LatestRecord
.selectExpr(“NULL as mergeKey”, “NewData.*”)
.union(updatesDF.selectExpr(“Id as mergeKey”, “*”))
)

#In the above dode the LatestRecord is contain the data which are new based on id and diffrrential by hashcode which is combination of all data row-wise and valid to is cusotm colum for identify that this data is going to insert or not.

SCD2.alias(“SCD2”).merge(
stagedUpdates.alias(“staged_updates”),
“SCD2.Id = mergeKey”)\
.whenMatchedUpdate(
condition = “staged_updates.HashCode <> SCD2.HashCode”,
set = {
“ValidTo”: “staged_updates.ValidTo”
}
).whenNotMatchedInsert(
values = {
“Id”: “staged_updates.Id”,
“Name”: “staged_updates.Name”,
“Address”: “staged_updates.Address”,
“ValidFrom”: “staged_updates.ValidFrom”,
“ValidTo”: “False”,
“HashCode”:”staged_updates.HashCode”
}
).execute()

#In the above code the if the data is match than first it is disable the old record and than update the recrod with new data. if the data is not match than it will simply inster into the master table.

#Finall step: Execute below query for see the data in master table

spark.sql(“select * from SCD2 order by id”).show()

OutPut:

First Execution: If validto is false tham the record is active else deactive.

In this execution, it simply inserts the data in the Master Delta table.

Second Execution:

In this execution, Id 1 having the name XYZ is disabled and the new record is enabled.

Third Execution:

In this execution, the Id 2 having name ABC is diable and a new record is updated with enabling and also fresh record having id 4 is inserted.

Note: Right now the merge query is only support the string or column name. If you are using a column name then it must be in the data frame.

--

--

Mayur Panchal
Analytics Vidhya

Azure Developer,Micro-services,MVC,.net core,Web API, 1.3+ Experience as a software developer.