How to UPSERT data into a relational database using Apache Spark: Part 2 (Python Version)

Thomas Thomas
4 min readFeb 15, 2022

In my opinion, Database UPSERT won’t be complete without talking about MERGE functionality. In this blog, we will sail through how we can UPSERT using the MERGE command.

SELECT from a database table and then INSERT or UPDATE based on the condition are the traditional way of handling data UPSERT. With the MERGE statement, you can combine those statements into a single one and improve the overall performance of the individual transaction.

Most of the relational databases support the MERGE command. In the Part 1 example, I’ve used MySQL. But MERGE is not supported by MySQL, However, there is a different way of achieving the same in MySQL:

INSERT…ON DUPLICATE KEY UPDATE

If you specify the ON DUPLICATE KEY UPDATE option in the INSERT statement and the new row causes a duplicate value in the UNIQUE or PRIMARY KEY index, MySQL performs an update to the old row based on the new values.

In this example, I’m going to use the PostgreSQL database and INSERT ON CONFLICT statement. Following is a syntax diagram for the INSERT statement showing all of its principal clauses:

In its simplest form the INSERT ON CONFLICT statement looks like this:

INSERT INTO table_name(column_list)  
VALUES(value_list)
ON CONFLICT
DO
UPDATE Statement;

Let’s re-write Step 4.4 in Part 1 example using the INSERT ON CONFLICT command.

4. Update database table records using Spark.

For our example let’s say the school city “Abilene” is changed to “NewYork” and “University of Akron, University of Alabama” is changed to “Albany State University”.

Let's see how this is updated into the school table.

Step 4.1: Get the data from CSV file to update the school table

Step 4.2: Setup JDBC connection parameters. Please note that I’ve modified the connection parameter to suit the PostgreSQL database.

Step 4.3: Broadcast JDBC connection parameters to cluster nodes.

Step 4.4: Create a function process_row to process each row in the partition .The function execute an INSERT ON CONFLICT statement using the key column value to check whether the spark dataframe row exists in the database.If the row exists, executes an UPDATE statement and if a row does not exist, create an INSERT statement.

Function: process_row

Step 4.5: Create a function process_partition to process each partition in the dataframe. The function establishes a PostgreSQL database connection and process all rows in the current partition using process_row function and commit all changes in that partition.

Function process_partition

Step 4.6: Browse through each partitioned data and process each partition using process_partition function.

In the above example Coalesce is used. You can use repartition also.

On a side note, here is the difference between coalesce and repartition

I didn’t get a chance to benchmark this time. Based on my previous benchmarking exercise, I beleive INSERT ON CONFLICT statements should perform better than the traditional SELECT/and UPDATE/INSERT.

Happy Coding…

--

--

Thomas Thomas

Principal Data Engineer | Data & AI | AWS | Azure| Databricks Certified Data Engineer Professional| Spark|Python| Author