Spark’s Skew Problem —Does It Impact Performance ?

Aditya Sahu
Oct 30, 2021 · 6 min read

This post is the 2nd post in the series of 5 Most Common Spark Performance Problems . As we move forward and discuss our 1st common problem. It is advisable to get a look at 1st post to get an idea how we will be walking through this 6 blog long story.

Again a note of thanks to Databrick’s

Optimizing Apache Spark On databricks Course

Let’s start by taking a look at Spark’s Skew Problem. So what is Skew ?

Spark typically reads data in the block of 128MB and it is evenly distributed across partitions (Although, this behaviour can tuned using maxPartitionBytes — I’ll create separate post on this, This is really a good property to be tracked on). The moment we perform some transformations (eg. aggregations) on our dataset it is very much possible to have uneven data on different partitions. The small amount of skew is still ignorable but large skew can result in spill or even OOM errors. So, it is very much important to keep track of this issue while working with big datasets.

Let’s see a small example to better compare 2 scenarios

As we can see partition6 after aggregation got skewed (i.e. it has more data ~2X other stages) thus it will take 2X as long as to process, 2X as much as RAM. Which will make overall job execution time more (all other tasks will be just waiting for this to be completed, sounds insane right ? why to wait ideal ?) and even in worst cases we might even don’t have enough RAM to process this single partition.

So, How to solve this ? Take your time and think a bit before scrolling down.

I believe some of you would definitely came up with some interesting solutions. So, let’s discuss them together and let’s see if I am able to cover all of them, if not, please do let me know in comment. I’ll be more than happy to include those too in my post.

So, simple question, not enough RAM to process that large single partition? simple answer, increase the RAM then (after-all in the world of cloud it’s just a matter of one click to increase it, agree?)

Wait wait that’s not the ideal solution 😂, you too know it right ?

Solving the RAM problem is only treating the symptoms not the actual root cause. The RAM problem will actually mapped to Spill or OOM Errors and should not be the first one to solve for (We will discuss more on Spills in subsequent blogs).

The very first problem we need to solve is uneven distribution of data over partitions which is directly mapped to tasks taking longer time to complete.

I have mainly 3 ways to mitigate this, if you know some more, comment section is yours.

  • Using Skew hint (Available only on Databricks’s)

Skew Hints : Something Good to know about

As mentioned previously this option is only available on Databrick’s platform and might not be useful for many native spark users but I believe, still it is worth knowing about.

In this case we will not be mainly focusing on execution time rather we would be focusing on overall health of executors, spills, shuffle reads mainly thus getting rid of OOM errors and employ at-least successful completion of job.

Let’s take a look a below code snippets.

Normal Join Operation

val statesDF = spark.read.format("delta").load(statesPath)val txtDF = spark.read.format("delta").load(txtPath)trxDF.join(statesDF, statesDF("state_id") === trxDF("state_id")).
write.format("noop").mode("overwrite").save()
execution time: ~29 mins

Join using Skew Hint

val statesDF = spark.read.format("delta").load(statesPath)val trxDF = spark.read.format("delta").load(trxPath).    hint("skew", "state_id")trxDF.join(statesDF, statesDF("state_id") === trxDF("state_id")).
write.format("noop").mode("overwrite").save()
execution time: ~35 mins

Note: As mentioned, we are not focusing on execution time rather other factors. Let’s have a side by side comparison of different parameters to better understand the scenario.

Let’s first look at spark’s Event Timeline to get health of executors.

Now let’s take a look at Summary Metrics, we are interested in reading Shuffle Read Size/Records

If we take a look at above screenshot we can easily identify that normal join operation has 0 value in Min and 25th percentile column which means we do have partitions with 0 records and some partitions with 430MB data. clearly skew issue but on the contrary if we see the opposite one it has fairly even distribution of records throughout the parameters thus even distribution of data.

Let’s look for last one Aggregated metrics by Executors mainly Spill(Memory) and Spill(Disk) columns

That’s all for Skew Hint, we can see skew hint can give us more correct distribution of data, it does took little more time to execute but there is a tradeoff we have to deal with.

Now that we covered skew hint and our first mitigation strategy, I’ll suggest you guys to take a break (If you are not, I am taking a one 😛), grab a cup of coffee (black one, if you want my suggestion😇) and come back and let’s walk together.

Holla !!! We are back.

AQE (Adaptive Query Execution) :

As mentioned earlier, this feature is available from Spark 3 onwards. Just to remind again.

Let’s take a look at below code snippet and see what changed?

spark.conf.set("spark.sql.adaptive.enabled", true)
spark.conf.set("spark.sql.adaptive.skewedJoin.enabled", true)
spark.conf.set("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128m")val statesDF = spark.read.format("delta").load(statesPath)val trxDF = spark.read.format("delta").load(trxPath) trxDF.join(statesDF, ctyDF("states_id") === trxDF("states_id"))
.write.format("noop").mode("overwrite").save()
Execution time: ~25 mins

If we see, we need to enable 2 parameters to let spark know, we are asking to use adaptive query engine and those 2 parameters are spark.sql.adaptive.enabled and spark.sql.adaptive.skewedJoin.enabled apart from this we are also setting spark.sql.adaptive.advisoryPartitionSizeInBytes parameter to 128MB so, I am using this as a normal spark behaviour as it reads fresh partitions in 128MB so, I want to stick to that (default value of this is 64MB). Remaining everything is same and if we see the execution time we really got a good performance compared to our first 2 scenarios.

So, now let’s take a look at some of spark metrics to better understand what’s happening.

Let’s look at Event Timeline

That’s great !! Don’t you guys think, it’s way better distributed across partitions. This is one of the reason why we have performance gain here. Now we don’t have staggered tasks.

Now, Let’s look at Aggregated Metrics by Executors

We don’t have Spill(Memory) and Spill(Disk) column anymore, which means we don’t have any spill whatsoever (on a side note, spark ui doesn’t show spill column if there is no spill, it seems kind of weird to me rather it should show 0 there but let’s leave that aside). This comes as a 2nd reason to gain performance. No spill no more disk-memory IO’s.

Let’s see one more thing on side of curiosity.

So, that’s all on AQE, I’ll try and create separate blog on AQE. It’s a great feature with some really good optimization fixes.

Here we concluded our 2 mitigation strategies and now it looks like this blog is stretching more than expected and I don’t want to make it so. So, I’ll be writing a separate blog on our 3rd mitigation strategy i.e. Salted columns just to make things easier to grasp.

With this I would like to close our discussion here. We will be coming up with next blog Salted Columns very soon. Till then keep surfing and stay tuned 🤞.

Bye Byeee !!

Curious Data Catalog

FLOW WITH DATA