Spark Dynamic Partition Inserts — Part 1
By: Roi Teveth and Itai Yaffe
At Nielsen Identity Engine, we use Spark to process 10’s of TBs of raw data from Kafka and AWS S3.
Currently, all our Spark applications run on top of AWS EMR, and we launch 1000’s of nodes per day.
For a more detailed overview of how we use Spark, check out our Spark+AI Summit 2019 Europe session.
In this blog series, we will deep dive into Spark Partitioning, Dynamic Partition Inserts and its culprits when using S3.
If you’re already using Spark, you probably know what partitioning is, and perhaps you even encountered Dynamic Partition Insertions.
But even If you are not familiar with Spark partitioning in general or Dynamic Partition Inserts, don’t worry — we’ll explain these terms in the next paragraphs.
What is partitioning in Spark?
“Partitioning is simply defined as dividing into parts, in a distributed system. Partitioning means, the division of the large dataset” (from Apache Spark Partitioning and Spark Partition).
Consider the following example:
We developed a data pipeline to analyze marketing campaigns’ efficiency on a daily basis.
The first stage of this pipeline is a Spark application that reads all the data of the previous day from our data lake, does some transformations, and finally writes the output to an S3 bucket, partitioned by campaign and date.
Here’s a code snippet:
val bigDataFrame = spark.read.parquet(“s3://data-lake/date=2020–02–29”)
val transformedDataFrame = bigDataFrame.map(…).filter(…)transformedDataFrame.write
.partitionBy(“campaign”, “date”)
.mode(SaveMode.Append)
.parquet(“s3://spark-output”))
The output directory will have a structure similar to this:
spark-output/
├── campaign=1/
│ ├── date=2020–02–29/
| └── part-0001.snappy.parquet
| └── …
├── campaign=2/
│ ├── date=2020–02–29/
| └── part-0001.snappy.parquet
| └── …
├── campaign=3/
│ ├── date=2020–02–29/
| └── …
├── campaign=…
A more realistic scenario (or: if you build it — it will fail!)
This application is scheduled on a daily basis, and adds new records (i.e records that were ingested to our data lake in the past day) to the relevant folder, based on the record’s campaign and date.
But in the real-world, applications tend to fail, so what happens if the application failed mid-execution?
In that case, some output files might have already been created in the destination folder, while others might not.
To make sure we processed all the data, we will need to re-run the application, but then we will end up with duplicate records, i.e some records were already added during the first execution, and were added again during the second execution (see below). This means the application is not idempotent.
To mitigate this issue, the “trivial” solution in Spark would be to use SaveMode.Overwrite, so Spark will overwrite the existing data in the partitioned folder with the data processed in the second execution:
val bigDataFrame = spark.read.parquet(“s3://data-lake/date=2020–02–29”)
val transformedDataFrame = bigDataFrame.map(…).filter(…)transformedDataFrame.write
.partitionBy(“campaign”, “date”)
//.mode(SaveMode.Append)
.mode(SaveMode.Overwrite)
.parquet(“s3://spark-output”))
The problem with this approach is that it will overwrite the entire root folder (s3://spark-output in our example), or basically all the partitions (hereinafter “full-overwrite”).
Implementing a “naive” solution
We actually wrote some custom code to overcome this limitation:
// NOTE: dataFramesMap is of type Map[code, dataframe], where each // entry represents a campaign
dataFramesMap.foreach(campaign => {
val outputPath = “s3://spark-output/”+
”campaign=”+campaign.code+”/date=”+date
campaign.dataframe.write.mode(SaveMode.Overwrite).
parquet(outputPath)
})
This custom code:
- Created a map of DataFrames (one per campaign).
- Iterated through the map, and for each entry (i.e each campaign) —
wrote all the records to the specific partitioned directory
(e.g “campaign=1/date=2020–02–29/”).
This solved the “full-overwrite” problem, and we used it in production for some time.
But, after a while, things naturally changed (our scale grew, we added more features to this application, etc.) and at that point, we started to examine the application’s performance:
This snapshot (taken from Ganglia Cluster CPU graph on one of our EMR clusters) shows that the cluster is well-utilized during the first 30–40 minutes of the application’s execution, and after that, the utilization drops significantly to around 5% for the next several hours.
Digging deeper, we discovered this idle time is spent writing the output to S3.
This means the “naive” solution caused the application to be very inefficient in terms of resource utilization, resulting in very long execution time (as seen above).
So we couldn’t rely on Spark SaveMode.Overwrite, and we also couldn’t rely on our custom code… Dilemma…
Luckily for us, around that time, Spark 2.3.0 was released, introducing Dynamic Partition Inserts.
What is Spark Dynamic Partition Inserts? Why would you use it?
Starting from version 2.3.0, Spark allows you to only overwrite related partitions while overwriting a partitioned data source table.
In other words, “Spark now writes data partitioned just as Hive would — which means only the partitions that are touched by the INSERT query get overwritten and the others are not touched” (Writing Into Dynamic Partitions Using Spark).
Going back to our example, using this feature, we were able to add new records to the destination folder partitioned by campaign and date, or overwrite only the relevant partitions (i.e a combination of campaign and date) in case we had to re-run the application after a failure:
// NOTE: Setting the following is required, since the default is
// “static”
sparkSession.conf
.set(“spark.sql.sources.partitionOverwriteMode”, “dynamic”)val bigDataFrame = spark.read.parquet(“s3://data-lake/date=2020–02–29”)
val transformedDataFrame = bigDataFrame.map(…).filter(…)transformedDataFrame.write
.partitionBy(“campaign”, “date”)
.mode(SaveMode.Overwrite)
.parquet(“s3://spark-output”))
So now our application:
- Is idempotent.
- Efficiently utilizes the cluster resources.
- No longer relies on custom code to overwrite only relevant partitions.
But…
What happens when you run this application outside EMR (e.g on Kubernetes)?
Well… A job that used to take ~50 minutes to execute on EMR, took 90 minutes on Kubernetes — almost 2X the time!!!
Why? We’ll explain in our next post, so stay tuned!
P.S — a special thanks to our colleague, Etti Gur! This post is largely based on her work, and if you’d like to learn more about this use-case and about optimizing Spark-based data pipelines in general, check out the slides and video recording from our recent meetup.
[March 18th, 2020] update: part 2 was just published, read it here!