Merge/Upsert In Delta Lake

Gunanithi CS
3 min readFeb 21, 2024

In this article, we demystify Merge/Upsert operations in Delta Lake on Databricks, providing straightforward techniques to manage data updates effectively. Explore simplified approaches to seamlessly integrate changes into your workflows with confidence.

I have a CSV file located in my demo directory. Please ensure that you have sample data for this project, or you can download it from my repository .

We’ll begin by setting up the database and table to kickstart our project. need two sets of files. For instance, we’ll start with one file, create the table, delete records with ‘basic’ Subscription_Type, and update Country to uppercase. Later, we’ll merge this data with the original file, execute the Merge operation, and analyze the outcome.

%sql
CREATE DATABASE IF NOT EXISTS DELTA_DEMO
LOCATION '/mnt/formula1dlg/deltademo'
df = spark.read \
.option('infreschema',True) \
.option('header','true') \
.csv('/mnt/formula1dlg/dqf/raw/Netflix_Userbase_20240203.csv')

df.write.format('delta').mode('overwrite').option("overwriteSchema", "true") \
.saveAsTable('DELTA_DEMO.user_org')

df.write.format('delta').mode('overwrite').option("overwriteSchema", "true") \
.saveAsTable('DELTA_DEMO.user_update')

We have deleted the records if Subscription_Type is basic and updated the Country to upper case, now will merge this with the original data. and add the update_date column to track the updates.

%sql
delete from DELTA_DEMO.user_update where Subscription_Type = 'Basic';
Update DELTA_DEMO.user_update set Country = upper(Country);

ALTER TABLE DELTA_DEMO.user_update ADD COLUMNS (updatedDate DATE)

before Merge result-

We’ll use the ‘user_id’ column to join the update and original tables. If there’s a match, the country name will be converted to lowercase. For unmatched records, new entries will be loaded into the updated table.

SQL

%sql

MERGE INTO DELTA_DEMO.user_update tgt
USING DELTA_DEMO.user_org upd
ON tgt.User_ID = upd.User_ID
WHEN MATCHED THEN
UPDATE SET tgt.Country = upd.Country,
tgt.updatedDate = current_timestamp
WHEN NOT MATCHED
THEN INSERT (User_ID,Subscription_Type,Monthly_Revenue,Join_Date,Last_Payment_Date,Country,Age,Gender,Device,Plan_Duration) VALUES (User_ID,Subscription_Type,Monthly_Revenue,Join_Date,Last_Payment_Date,Country,Age,Gender,Device,Plan_Duration)

Python

from pyspark.sql.functions import current_timestamp
from delta.tables import DeltaTable

deltaTable = DeltaTable.forPath(spark, "/mnt/formula1dlg/deltademo/user_update")

deltaTable.alias("tgt").merge(
user_org.alias("org"),
"tgt.User_ID = org.User_ID") \
.whenMatchedUpdate(set = { "Country" : "org.Country", "updatedDate": "current_timestamp()" } ) \
.whenNotMatchedInsert(values =
{
"User_ID" : "org.User_ID",
"Subscription_Type":"org.Subscription_Type",
"Monthly_Revenue":"org.Monthly_Revenue",
"Join_Date":"org.Join_Date",
"Last_Payment_Date":"org.Last_Payment_Date",
"Country":"org.Country",
"Age":"org.Age",
"Gender":"org.Gender",
"Device":"org.Device",
"Plan_Duration":"org.Plan_Duration"
}
) \
.execute()

After Update — 15 records have been matched in the updated table, resulting in country names being changed to lowercase. Additionally, 6 new records have been seamlessly loaded into the update table.

%sql
select distinct Country,Subscription_Type,updatedDate from DELTA_DEMO.user_update

Project-related files are uploaded to the GitHub repository named delta_lake_exmaple

--

--