Improve performance of your Spark CDC jobs with Merge Statements

Abhishek Trehan
2 min readJul 17, 2024

--

When working with large volumes of data it is very important to consider the performance of your ETL jobs. CDC or as it stands — Change Data Capture is one of the important concepts when creating a warehouse, more important when you are dealing with large volumes of data .

Data set I used for testing here has about 200 million records and the total table population is about 2 billion records. There can be three scenarios with your incoming source data when capturing the change.

  1. Inserts the new records
  2. Updates the existing records
  3. Expire the deleted records
Photo by Markus Spiske on Unsplash

Consider your source data is in a snapshot mode, which means source will send you new data + updated data every time their is a change. In this case if a record is not received from the source it will be considered deleted. Hence you would need to expire that record on your end. Essence is to capture the data change in most efficient way without loosing history. We will use SCD (Slowly Changing Dimensions) -2 concept here.

First step would be to stage the incoming data into a table using below statement:

(df
.write.mode("overwrite")
.option("mergeSchema", "true")
.saveAsTable(f"source_stg"))

Next we will apply the merge between Source and Target

spark.sql(f"""MERGE INTO Target AS T 
USING Source AS S
ON T.Key= S.Key
-- If Keymatched then close the existing record in prep for new record
WHEN MATCHED and T.col1 = S.col1 and coalesce(T.END_Dt, '9999-12-31') = '9999-12-31'
THEN UPDATE SET T.END_Dt =S.START_Dt
-- If Key DOES matched then close the existing record as source deleted it
WHEN NOT MATCHED by SOURCE and T.Key is not null
THEN UPDATE SET T. END_Dt = '{current_time}'
""")

Above condition will apply the updates to existing data and expire records that that got deleted from your source. Additionally we use INSERT to append new data to your target table. This segregation will boost up your execution and help achieve performance gains.

spark.sql(f"INSERT INTO Target TABLE Source_stg")

Over all with this approach you can significantly reduce your execution time . Execution time it took for me to run this on a table with 2 billion records( with in-scope update population of about 200 million) was about 132 sec . A lot also depends on your compute sizes, refer to my other post here for compute details and Delta load

Benefits of improved performance:

  1. Lower operating cost
  2. Near real time data availability
  3. Optimal use of compute resources

--

--