Dynamic Partition Upsert — SPARK

Tharun Kumar Sekar
4 min readMay 17, 2022

--

If you’re using Spark, you probably know what partitioning is, and perhaps you would have even encountered Dynamic Partitions. But even, If you are not familiar with Spark partitioning in general or Dynamic Partition Inserts, don’t worry, we’ve got it covered.

Partitioning in Spark

Partition in simple terms means splitting the data based on a column’s value and storing it in individual partitions/folders.

Let’s look at the usual way, we save the data with Partitions and then see how Dynamic Partition can help us.

val historyDF = Seq(
(8, "bat", "1", "2022-05-01"),
(64, "mouse", "1", "2022-05-02"),
(-27, "horse", "1", "2022-05-03"),
(-28, "mouse", "1","2022-05-03"),
(10, "bat", "1", "2022-05-04")
).
toDF("number", "word", "priority","date")

Here, we have created a dataframe “historyDF” with 5 records. Let’s look at the data now.

Now let’s save this dataframe to the folder “dbfs:/dynamicPartitions/” with partitioning based on the column “date”.

historyDF.
write.
mode("overwrite").
partitionBy("date").
parquet("dbfs:/dynamicPartitions/")

Let’s look at how the data looks after it’s saved in the file system.

We can see 4 subfolders created for the 4 different dates which were used for partitioning. The subfolders were created at the same timestamp. We can confirm this by viewing the “modificationTime” entry in the above image.

Need for Dynamic Partitioning

In most cases, we would need to run a daily load/ETL to load data into HDFS. This would also have updated or additional records belonging to the previous dates. In technical terms, we should do an “UPSERT” — (Update and Insert) to the existing partitions and load the current date’s data into a new partition.

Let’s create a sample data and store it in a dataframe “deltaDF”.

val deltaDF = Seq(
(64, "mouse", "2", "2022-05-02"),
(-29, "mouse", "2", "2022-05-03"),
(10, "cat", "2", "2022-05-05")
).
toDF("number", "word", "priority", "date")

Now let’s try to understand the data in “deltaDF”. We have 3 records in total and 1 record for each day. The record for 2022–05–02is an exact replica of the existing record in “historyDF” with just the priority changed. The record for 2022–05–03has an update to the field “number” with the value being changed from -28 to -29. Finally, we have a new entry 2022-05-05 with one record.

The expectation now is to have one new partition created for 2022-05-05, have the record updated for 2022-05-03 and the partition to be overwritten for 2022-05-02. The rest of the partitions should remain untouched.

How to Dynamic Partition

In order to perform this, we need to first update the spark’s partition override mode to dynamic. This can be done by running the command,

spark.conf.set("spark.sql.sources.partitionOverwriteMode","dynamic")

Since most of the HDFS is object-based storage, we cannot update individual records in HDFS. Instead we can update the partitions we are interested in. In order to achieve this, we need to have the complete data in a partition that’s going to be overwritten in memory so that we don’t lose the data. It’s confusing right. Let’s make it simple with code.

val distinctDates = deltaDF.
select('date).
distinct.
map(_.getString(0)).collect().toList
val filteredHistoryDF = historyDF.
filter('date.isin(distinctDates:_*))
filteredHistoryDF.
union(deltaDF).
withColumn(
"rank",
row_number.over(
Window.partitionBy(
'date,
'word).
orderBy('priority.desc)
)
).
filter('rank === 1).
write.
mode("overwrite").
partitionBy("date").
parquet("dbfs:/dynamicPartitions/")

Here, we are selecting the different dates for which we are having entries in the deltaDF. Then we are filtering the records related to these dates from the historyDF. Now we union both the delta and the filtered history and select the latest records, so we have the updated ones along with the ones that previously existed. And finally we save the records to the same path.

Let’s see how the data looks in the file system after the update has happened.

We can see that a new partition folder was created for 2022-05-05 and partitions 2022-05-02 and 2022-05-03 were updated. We can confirm this by viewing the timestamp field modificationTime. The other partitions are untouched and have the same timestamp ( the creation timestamp).

By doing this, we save execution time by skipping partitions which we are not interested in and also save a lot of I/O time.

If you liked this article, click the 👏 so other people will see it here on Medium.

--

--