4 Performance improving techniques to make Spark Joins 10X faster

Suryakant Pandey
Analytics Vidhya
Published in
7 min readJul 26, 2021

--

Spark is a lightning-fast computing framework for big data that supports in-memory processing across a cluster of machines. In this blog, we will cover optimizations related to JOIN operation in spark.

Joining two datasets is a heavy operation and needs lots of data movement (shuffling) across the network, to ensure rows with matching join keys get co-located physically ( on the same node).

Writing code for dataset joins in spark is very easy but to make it performant is tricky as one needs to understand how datasets are joined internally in the Spark. Having recently looked into multiple Spark join jobs and optimized them to complete more than 10X faster, I am going to share my learnings on optimizing spark joins. We first understand the internals of join to set the context.

Popular types of Joins

Broadcast Join

This type of join strategy is suitable when one side of the datasets in the join is fairly small. (The threshold can be configured using “spark. sql. autoBroadcastJoinThreshold” which is by default 10MB).

Consider the following example where Table A and small Table B ( less than 10 MB) have to be joined. In this case, the Spark driver broadcasts table B to all nodes on the cluster where partitions of table A are present.

Now since Table B is present on all the nodes where we have data for table A, no more data shuffling is required and each partition of table A can join with the required entries of table B.

This is the fastest type of join( as the bigger table requires no data shuffling) but has the limitation that one table in the join has to be small.

Sort Merge Join

This is the standard join type, suitable when datasets on both sides of the join are medium/large.

This join happens in 3 stages.

  1. Shuffle partitions: The default value of the number of partitions as an output of this stage is 200 (can be changed using spark.sql.shuffle.partitions). The goal of this step is to reshuffle the data of table A and table B in such a way that rows that should be joined go to the same partition identifier ( Data rows to be joined becomes co-located physically/ on the same node). Partition identifier for a row is determined as Hash(join key)% 200 ( value of spark.sql.shuffle.partitions) . This is done for both tables A and B using the same hash function. This results in all the rows( in both table A and table B) with equal value in the join column being reshuffled to the same node post reshuffling ( since their hash value would be the same). This type of repartitioning is called HashRepartitioning. Hash is computed by default using the .hashcode() method in java.
  2. Sorting within each partition: This sorting is also done based on the join key.
  3. Join the sorted partitions: Depending on the join type(INNER, LEFT, etc), we produce the final output. This approach is similar to the standard solution for “merging two sorted arrays” using two pointers.
SortMerge Join

We can use the SQL tab in spark UI, to find out which type of join is happening in the job. Most of the job failures are observed when the dataset on both sides of the join is huge (or sortmerge join is picked). We will understand what causes failures/slowness in sortmerge join and how to overcome that.

Optimizations

When we check job logs and failures there were error messages like

ExecutorLostFailure (executor 235 exited unrelated to the running tasks) Reason: Container container_e327_1624530648585_229354_01_000378 on host:FetchFailed(BlockManagerId(76, prod-***-nm-0001, 7337, None), shuffleId=5, mapId=6477, reduceId=2,

Error messages don’t mean what they say and we had to do multiple optimizations to fix the issue. Here are the key takeaways.

Data skewness

Even when input data to the join step is uniformly partitioned, it may so happen that the intermediate data produced post shuffling in the sortmerge join doesn’t have uniform data size in new partitions created.

If we recollect the above discussion, in 1st stage of sortmerge join, data is hashed based on the join key and a new partition for that is decided. If we have a non-uniform distribution of values in the join column, then we would have a skewed partition produced after data shuffling.

Examples:

Let’s say if we have to join the following table with any other table based on the searchSessionId column. ( We see the majority of the data has null values, which means all the null searchSessionId rows will hash to the same value resulting in all of them going to the same partition post the shuffling step)

In such cases, we would see that most tasks get completed very fast and a single task is taking very long to complete. For the above data being joined with another dataset, the below diagram captures the processing time for each shuffled partition. We see that most partitions are processed very fast and a single partition is taking a long time indicating skewness in the shuffled partition produced for join ( due to skewness in the distribution of values of the join key).

Data skewness leading to slow joins

In general, non-uniform distribution in the join column can lead to this skewness. (Eg value x is very frequent in the 1st table below and all those rows go to a single partition post shuffling). Data skewness is the most popular reason for join failures/slowness.

Handling data skewness

a) Replace join of A and B with : ((A where join column is non-null) joined with (B ) ) UNION with (A where join column is null). This is for cases where apart from null values, other values in the join column are uniformly distributed. A more generic approach is discussed in b).

b) Add random numbers to the join column: Here we add some random value to the join key to make it distributed.

In Table A( which has skewness): Concatenate with some character and append some random number in the range (1,3). key -> key+”_”+random(1,3)

In Table B( which has uniform distribution): Explode one row into three with join keys being x_1, x_2, and x_3 in them.

Post this we see that all partitions are uniform sized now as there is no skewness in the new join column now and join completes much faster.

The code snippet which helps us achieve this is presented below.

Keep input data lean

If the join is becoming too slow, remove columns from the data which are not required post joining. Write this intermediate data somewhere and read this as input in the join step. This will reduce the size of the data that moves across the network during data shuffling.

Also, filter out any rows which might not be required post joining.

Split big join into multiple smaller joins

If there are ways to split your input data into smaller batches without affecting the functionality then try doing joins in small batches. In our case, we were joining two datasets on a UUID field. Both the datasets also had a timestamp column. So, instead of joining the whole day data, we divided a day into multiple slots and joined only the corresponding slots of each data. This idea came to us as when we were joining full-day data it was taking 2–3 days to complete but sampling it to 25% data, made the join possible in 1–2 hours indicating smaller joins are faster.

Sample code snippet which breaks a big join into multiple smaller ( based on timestamp) is presented below.

Job parameters tuning

Following are the important parameters that we can increase. ( This might not help much if you have any of the fundamental issues described above).

executor-memory, spark.executor.memoryOverhead, spark.sql.shuffle.partitions, executor-cores, num-executors

Conclusion

With the above optimizations, we were able to improve our job performance by greater than 10X.

Summarising the key things we discussed in this blog.

  1. How join internally happens. Broadcast join vs SortMerge join.
  2. Why does Data skewness happen(In Input data VS no skewness in input data partition but induced due to Hashpartitioning before join)
  3. Handling data skewness
  4. Keep the input data to join as lean as possible
  5. Split big join into multiple smaller join
  6. Tuning the spark job parameters for join

The data skewness is the predominant reason for join failures/slowness. For any other wide transformations too ( like distinct(), reduceBykey(), etc), similar data reshuffling happens at the start to bring keys with the same hash value to the same partitions.

Thanks for reading! Any comments or suggestions are welcome!

Contact at: LinkedIn, Instagram

--

--