Spark Performance Optimization Series: #3. Shuffle
Apache Spark optimization techniques for better performance
A Shuffle operation is the natural side effect of wide transformation. We see that with wide transformations like, join(), distinct(), groupBy(), orderBy() and a handful of others.
It does not matter at which stage we are, data needs to be read in from somewhere. Data can be read in from source or from previous stage. As the data is being read in it is going to get processed in a way that is unique to the algorithm that is requesting the shuffle operation. If we look at the groupBy() operation let’s say groupBy colour, the mapping operation is going to identify which record belongs to which color as seen from the above diagram in map phase.
Then the data will be moved to stage 2 and in stage 2 it will bring all the like records together into a partition. Then the records will go through some kind of reduce operation, let’s say groupBy() and then count() then the reduce operation will be the count of those records. Then like every case we conclude the stage by writing it back out. Either we finish the job and do some kind of termination or we are writing out to yet another set of shuffle files for the next stage after this.
The real performance hit here is on the map side we are doing data processing and writing it to disk and on the read side we are actually asking the other side to read that back from disk sending it back across the wire between executors and then do yet more processing on it.
Important point to note with Shuffle is not all Shuffles are the same.
distinct — aggregates many records based on one or more keys and reduces all duplicates to one record.
groupBy / Count — Combination aggregates many records based on a key and then returns one record which is the count of that key.
join — takes two datasets , aggregates each of those by a common key and produces one record for each matching combination.
crossJoin — takes two datasets, aggregates each of those by a common key, and produces one record for every possible combination. Very heavy and expensive shuffle operation.
Also there are similarities between the different shuffle operations.
- They read data from some source.
- They aggregate records across all partitions together by some key.
- The aggregated records are written to disk (Shuffle files).
- Each executors read their aggregated records from the other executors.
- This requires expensive disk and network IO.
What can we do to mitigate Performance issues caused by Shuffle? 🤔
- Our number one strategy is to reduce network IO by using fewer and larger workers. The larger the machine and the fewer machines we have the less data that we have to move between those machines. We will still incur the disk IO but having fewer machine we are going to significantly reduce the network IO.
- The next strategy is reduce the amount of data being shuffled as a whole. few of the things that we can do are: get rid of the columns that you don’t need, filter out unnecessary records, optimize data ingestion.
- De-normalize the datasets specifically if the shuffle is caused by a join.
- If you are joining tables you can employ a BroadcastHashJoin in which case the smaller of the two tables is redistributed to the executors to avoid the shuffle operation entirely. This is controlled by spark.sql.autoBroadcastJoinThreshold property (default setting is 10 MB). If the smaller of the two tables meet the 10 MB threshold than we can Broadcast it.
- For joins, pre-shuffle the data with a bucketed dataset. The goal is to eliminate the exchange & sort by pre-shuffling the data. The data is aggregated into N buckets and optionally sorted and the result is saved to a table and available for subsequent reads.
To be continued……….
Read more about these strategies at: