How to lower job costs on EC2 instances and improve performance when using Spark’s persist method

Brad Caffey
6 min readMar 27, 2024

--

image courtesy of pixabay.com

In this blog, we’ll examine Spark’s persist method and explain how to use it in a performant manner and lower job costs.

Persist basics

Before we dive in, we should cover some basics about persist. This method tells Spark to store your dataset within your executors so that the dataset can be referenced again without being recalculated.

Here’s how to flag a dataset for persisting.

val df = sparkSession.sql(“<some sql>”)
df.persist()

Actions and Transformations

Spark’s documentation for the Dataset object categorizes its methods as Actions, Typed Transformations and Untyped Transformations. Transformations are lazily evaluated which means if you call a transformation on a dataset flagged for persisting then nothing will happen. It’s only when the first action on the dataset has been reached that all transformations including the persist will happen.

Actions force persisting of data

Here are some methods that are considered actions: count(), collect(), min(), max(), first(), write(), saveAsObjectFile(), and SaveAsTextFile(). And here’s how you should flag a dataset with persist and then trigger an action to load that data into your executors.

val dataset = sparkSession.sql(“<some sql>”)
dataset.persist() //dataset is flagged for persisting but does not trigger the persisting
println(dataset.count()) //data is persisted as count is calculated

Beware of partial actions

You need to be careful of some methods that are listed in Spark’s documentation as actions but are actually partial actions which will not persist your entire dataset. For example…

printSchema()

In order for printSchema to derive the schema for your dataset, Spark only processes a single partition’s worth of data. As a result, only one partition is persisted in your executors. The rest of the dataset is not persisted.

show()/take()

Show/Take is similar to printSchema in that Spark will only process enough partitions to satisfy the number asked of in the show/take. Therefore, unless you show the entire dataset (which is not recommended), Spark will not fully persist your full dataset.

Methods commonly confused as actions

Finally, there are some Spark methods that are transformations which are sometimes confused as actions by junior engineers.

withColumn(), join(), and createOrReplaceTempView() : these methods are just transformations added to the query plan for your dataset and therefore will not trigger the calculation of your dataset.

Persist in action

Now let’s look at a code example to see when the persist is triggered during execution.

val df = sparkSession.sql(“<some sql>”)
df.persist() //Adds persisting to the query plan for this dataset
df.createOrReplaceTempView("tempView") //adds the temp view name to the query plan for this dataset
df.printSchema() //retrieves one partition of data to materialize schema
df.count() //count executes query plan and persists full dataset

When should you persist a dataset?

A persist should only be used when two or more actions happen upon the same dataset. Otherwise, the performance hit you experience when persisting the dataset will be for nothing.

How to check if a dataset is persisted or not.

The storage tab on the Spark U/I will show you while the job is running when a dataset is being persisted. If you do not see an RDD appear on this tab during your job’s run then the dataset is not being persisted or has not been persisted yet.

The storage tab on the Spark U/I lists datasets that are being persisted

Importance of unpersist

When persisting datasets, it’s important to unpersist them when you are done using them so that you can improve your performance and lower job costs. When you unpersist, the Spark engine marks the dataset for deletion which will cause the RDD to disappear immediately from the storage tab.

For example, when you use a persisted dataset to form new datasets, you should unpersist the original dataset after the new datasets have been persisted. See the following code example…

val dfMain = sparkSession.sql(“<some sql>”)
dfMain.persist().count() //count triggers persisting

val dfA = sparkSession.sql("<more sql>")
val joinedDfA = dfMain.join(dfA)

val dfB = sparkSession.sql("<different sql>")
val joinedDfB = dfMain.join(dfB)

joinedDfA.persist().count() //persist new dataset A
joinedDfB.persist().count() //persist new dataset B

dfMain.unpersist() //flag main dataset for removal from storage

Note: While unpersisted data is not immediately removed from your executors, Spark will reclaim that space if/when another dataset is calculated and/or persisted.

Using persist in a loop

When using persist in a loop, it’s critical that you unpersist your datasets at the end of your loop. Consider the following code…

while(start_date<=end_date)
{
val df = sparkSession.read.parquet(s"$s3path/partition_date=$start_date")
df.persist().count()
df.coalesce(10).write.parquet(s"$newS3Path/partition_date=$start_date")
start_date=start_date.plusDays(1)
}

This code will loop through all days in the range, read s3 data from the specified path, persist it in memory, coalesce to ten partitions and then write the output to a new location in the form of ten files.

It’s very reasonable to think that because the df variable is being reused in this loop that the persisted data associated with that variable will be automatically unpersisted by Spark during the next iteration of the loop. However, that’s not the case.

If you were to look at the storage tab in Spark U/I for this job, you will find an RDD on the Storage tab for every day acted upon in the loop. When datasets in a loop are not unpersisted, Spark will keep adding executors to make room for the new datasets created in subsequent iterations of the loop.

How persists and unpersist can impact job costs

Before I talk about how not using unpersist can increase job costs, let me give a quick overview of how job costs are calculated.

Costs for Spark jobs running on EC2/Azure nodes are simply calculated like this: Node count * Job run time (hours) * Node price.

If you want to learn more in-depth about Spark job costs, please check out my six-part blog series Cloud Spending Efficiency Guide for Apache Spark on EC2 Instances.

The number of executors that appear on a node depend on two factors: the executor configuration and the node type.

For this simplified example, let’s assume that we have only two executors per node. Because the datasets are being coalesced into ten partitions, only 2 five core executors are needed to process the data since only a single core can write/process a single partition.

Because the persisted datasets are not being unpersisted at the end of the loop, executors are added to the job to make room for the newly persisted datasets. As a result, after all the iterations of the loop were done, ten executors/five nodes were added simply to store the extra data. So while the job ended with 12 executors and 60 spark cores, only the first ten Spark cores wrote data to disk because of the number of partitions.

If you don’t unpersist persisted data at the end of a loop, spark will needlessly add executors to accommodate for new persisted data.

Because the extra five nodes did not improve the run time of the job, job costs increased. After an unpersist was added to the end of the loop, this same Spark job processed the same number of days with the same run time but this time finished with 2 executors. Fewer nodes with same run time equals lower job costs.

Here’s a more extreme example I encountered. A job that created six persisted datasets in a loop with four iterations had its run time reduced from 120 minutes to 90 minutes by simply adding six unpersists statements at the end of the loop. Not only did the job run quicker with the unpersists, it ran on fewer nodes! As a result, the unpersists lowered job costs by 20%!

Only one persisted dataset at end of job

When your job has completed, you should only have at most only a single dataset (i.e. the final dataset being written by your job) on your Storage tab. Any additional datasets listed are potentially slowing your job down and adding unnecessary costs to your job.

Conclusion

The persist and unpersist methods can greatly improve performance and job costs when they are used correctly. Persist should only be used when two actions or more are called on a single dataset. And once a dataset is no longer needed, it should be unpersisted. You can check the Storage tab in the Spark U/I while a job is running to see if a dataset is being persisted. And most importantly, reusing a dataset variable will not cause Spark to unpersist that dataset. So you should always check your Spark logs after a job is done to see how many datasets were persisted at the end of the job run. If you see more than one dataset then you should consider adding unpersists.

If you liked this blog about persist, be sure to check out my blog for coalesce as well.

--

--