How to improve performance and lower job costs on EC2 instances when using Spark’s coalesce method

Brad Caffey
7 min readMar 27, 2024

--

image courtesy of pixabay.com

In this blog, we’ll talk about how to use coalesce in a performant manner and lower job costs. The coalesce method is commonly used to reduce output data to a specific number of files. However, the number of files you choose for the coalesce can greatly impact the performance of your write of that dataset and your job costs. So I’ll also talk about the considerations you should make when choosing the number of files to coalesce to. I’ll also explain why coalesce is more performant than repartition.

Coalesce vs Repartition

First, let’s talk about the difference between repartition and coalesce. Repartition allows you to increase or decrease the number of partitions for a dataset. However, coalesce only lets you decrease the number of partitions for a dataset but it does so in a more efficient manner than repartition.

Coalesce basics

Let’s dive into how a coalesce method works. We’ll start with a simple example using two executors that contains a dataset divided into ten partitions.

Two executors with five partitions each

If we perform a coalesce(2) on this dataset, then Spark will simply merge the five partitions on each executor into a single partition on each executor.

Same two executors after a coalesce(2) is performed

The operation is extremely efficient because there is no shuffle/transfer of data between the two executors.

However, if we performed a repartition(2) on this same dataset then a hash partitioner is applied to the entire dataset which would cause data to be swapped from each executor randomly.

Same two executors when repartition(2) is performed instead

This random number of shuffles performed between the two executors will cause a performance hit.

Now, let’s perform a coalesce(1) instead. When we do this, Spark will shuffle all data from one executor to the other and merge all ten partitions into a single partition.

Same two executors after coalesce(1) is performed

Unfortunately, this shuffle of data between executors adds a performance hit to your job. In addition, if the data from the ten freshly merged partitions won’t fit in the memory space of the first executor then there will be a shuffle spill…which is another performance hit. Finally, even though one of the executors no longer contains data, it will still be running on your cluster contributing to your job costs.

Now, let’s look at a bigger example. Lets see what happens when performing a coalesce(1) against a dataset with 50 partitions spread across ten ten executors.

Ten executors with five partitions when a coalesce(1) is performed

This time we have nine shuffles transferring 45 partitions from nine executors to a single partition within the tenth executor. This means a lot of shuffle spill will happen and the performance hit is even greater.

Whenever the number of coalesced partitions is smaller than the number of your executors you will experience a performance hit that scales with the difference between those two counts. If you have a small dataset these shuffles shouldn’t be too expensive in terms of a performance hit. However, if your dataset is large relative to your executors, the performance hit will be significant.

If you have a really large dataset, you should consider coalescing to the number of executors you have running at that time or close to that executor count. You can determine the number of executors at run time using this code.

val executorCount = sparkSession.sparkContext.getExecutorMemoryStatus.size — 1 
//size includes driver so subtract 1 to get executor count

Data skew in coalesced partitions

Its important to point out that if data skew exists in your original partitions, then the data skew will also be present in your coalesced partitions. (Data skew happens when partitions are not of equal sizes.)

Lets revisit our original example but this time introduced partitions with data skew.

Two executors with five partitions each that have data skew

When a coalece(2) is performed on this dataset, the coaleced partitions will also be skewed as well.

Same two executors after coalesce(2) is performed on skewed partitions

Importance of persisting before a coalesce

If you don’t persist your dataset before using a coalesce then Spark will calculate your dataset using the same number of cores needed for the coalesce. So, if you use .coalesce(1) then Spark will calculate the entire dataframe using just a single core.

val final_df_1 = spark.sql("<lots of SQL>")
final_df_1.coalesce(1).write.parquet("<s3 location>")//dataset will be calculated by a single core in entire spark job before being written to disk using a single core

However, if we persist this dataset before we coalesce then all Spark cores in your job will be utilized when processing this dataset.

val final_df_all = spark.sql(“<lots of SQL>”)
final_df_all.persist().count() //dataset is calculated by all available cores in all executors before being persisted
final_df_all.coalesce(1).write.parquet("<s3 location>")//persisted data will be consolidated to 1 executor and written to disk by single core in entire spark job

The latter code block will perform much quicker than the former one because of the extra Spark cores used to process the dataset.

How coalesce can impact job costs

Before I talk about job costs and coalesce, let me give a quick overview about how job costs are calculated.

Costs for Spark jobs running on EC2 or Azure nodes are simply calculated like this: Node count * Job run time (hours) * Node price.

If you want to learn more in-depth about Spark job costs, please check out my six-part blog series Cloud Spending Efficiency Guide for Apache Spark on EC2 Instances.

The number of executors that appear on a node depend on two factors: the executor configuration and the node type.

Let’s go back to our example of coalescing 50 partitions in ten different executors. In this simplified example, we’ll assume two executors fit on a single node which means we have five nodes containing ten executors.

As we noted above, if we perform a coalesce(1) on this dataset, then all 50 partitions will be merged into a single partition on a single executor. Because only a single core can write a single partition at a time, the entire dataset will now be written by this single Spark core.

After performing a coalesce(1) on dataset, we end up with a single partition on a single executor

Having a single Spark core write our entire dataset will obviously be slow because we have no parallelism and this single partition contains 100% of the data. On top of that, the nine remaining empty and idle executors will still reside on your nodes…which will still count toward your job costs. So in this example, you will be paying for four extra nodes doing nothing while a single node does 100% of the work.

Now if we perform a coalesce(10) on our dataset instead, we’ll end up with a single partition on each of our ten executors.

After performing a coalesce(10) on same dataset, we end up with 10 partitions on 10 executors

Which means when we write the dataset, we’ll now have the benefits of parallelism because we now have ten spare cores writing ten smaller partitions. More parallelism means faster run time during the write…which will result in low job costs.

Conclusion

The next time you need to use a coalesce to reduce the number of partitions in your dataset follow these considerations when choosing the number of coalesced partitions. Specifying a number equal to or larger than your executor count will result in no shuffle transfers and still get good parallelism during your write of the dataset. Specifying a number of partitions smaller than executor count will trigger shuffles for your dataset and reduce the amount of parallelism during your write which will increase your job costs in proportion to how small your coalesce partition count is.

When complex datasets are being coalesced make sure to persist the dataset first before coalescing so that all spark cores are used when calculating the dataset. If the dataset is not persisted then it will be calculated with the same number of spark cores as the number of partitions you specify.

If you liked this blog about coalesce, be sure to check out my blog for persist as well.

--

--