Spark Bucketing is not as simple as it looks

Ajith Shetty
Analytics Vidhya
Published in
6 min readJul 18, 2021
Photo by @shawnanggg on Unsplash

Partitioning and bucketing are the most common optimisation techniques we have been using in Hive and Spark.

Here in this blog I shall be talking specifically on the bucketing and how is it should be used.

Bucketing works only when the given cases are met. And we shall be talking about those and how to get the most out of bucketing.

What is bucketing?

In Spark and Hive Bucketing is a optimisation technique. We provide the column by which the data needs to be partitioned.

We need to make sure that the bucketing conditions are met to get the most out of it. When it is applied properly it will improve the joins by avoiding shuffles which is been a pain point in the Spark.

Advantages of Bucketing the Tables in Spark

Below are some of the advantages of bucketing (clustering) in Spark:

  • We can partition the data based on the given column
  • Optimised Joins when you use pre-shuffled bucketed tables.
  • Evenly distribution of the data.
  • Optimal access and query improvement.

How is Spark bucketing different from Hive bucketing

Hive Bucketing is not compatible with Spark Bucketing.

Hive uses the Hive hash function to create the buckets where as the Spark uses the Murmur3. So here there would be a extra Exchange and Sort when we join Hive bucketed table with Spark Bucketed table.

In Hive it needs the reducer per which the number of files to be created.

But where as in Spark bucketing we do not have a reducer so it would end up creating N number of files based on the number of tasks.

We tend to think that the when we defined the number of buckets as 1024, the number of files should be 1024.

But this is not the case with Spark.

In Spark, each task associated while inserting data into the table will be multiplied with 1024 number of buckets.

For example in our case,

We have 4 tasks running.

So 4(tasks)* 1024(number of buckets)= 4099

Based on the above calculation the number of files it created are: ~4096

We need to be very cautious while using bucketing in spark.

If the number of task grows exponentially it might end up creating millions of files.

Limitation in Spark Bucketing

Spark Bucketing has its own limitations and we need to be very cautious when we create the bucketed tables and while we join them together.

To optimise the join and make use of bucketing in Spark we need to be sure of the below:

  1. Both the tables are Bucketed with same number of buckets. If the bucket numbers are different in the joining tables, the pre shuffle will not be applied.
  2. Both the tables are bucketed on the same column for joining. As the data is partitioned based on the given bucketed column, if we do not use the same column for joining, you are not making use of bucketing and it will hit the performance.

Let’s run some tests with various conditions of bucketed and non bucketed tables and see the query plan on how the Spark will behave and apply the exchange/shuffle.

Our goal is to reduce the shuffle/exchange and to use the pre shuffle to join the tables.

TEST 1 : Join 2 tables where 1 table is bucketed and the other is not

default.salaries: Bucketed on ID with 1024 buckets

default.salaries_temp: Non bucketed table

Let’s look at the query plan.

Here we can clearly see that the there was an exchange from the table which is non bucketed. Now since the table size is small, it got broadcasted.

We can confirm that the Spark had to run the exchange operation for joining the bucketed and non bucketed table.

TEST 2: Join 2 Bucketed tables on the bucketed column with the same number of buckets

default.salaries_1: bucketed on Id column with 1024 buckets

default.salaries_2: bucketed on Id column with 1024 buckets

Here you can clearly see that the there was no exchange and it applied the SortMergeJoin. While preparing the physical plan, catalyst optimiser realised that both the tables are already partitioned and it did not have to shuffle the data.

This is the best case scenario and our goal as well.

Test 3: Join 2 bucketed table on the non bucketed column

default.salaries_1: Bucketed on Id with 1024 buckets

default.salaries_2: Bucketed on Id with 1024 buckets

But we will be joining the tables with the column EmployeeName.

Here we can clearly see that that there was an exchange and broadcast hash join applied on the table.

Even though the tables are bucketed on the same column with the same number of buckets, there was a shuffle. Because the joining column is different.

Test 4 : Joining 2 tables with different number of buckets

default.salaries_1: Bucketed on Id with 1024 buckets

default.salaries_3: Bucketed on Id with 1500 buckets

Even though both the tables are bucketed and on the same column, there was an exchange of the data.

Exchange is required on one of the bucketed table when the bucket number is different.

Test 5: Union bucketed table and then join

default.salaries_1: Bucketed on Id with 1024 buckets

default.salaries_2: Bucketed on Id with 1024 buckets

Both the tables are bucketed on the Id column and with the same number of buckets.

Here we will run UNION before the join.

Union salaries_1 with salaries_2 and then Join with salaries_1.

Upon applying UNION the outputPartitioning and OutputOrdering will be set to unknown because of that the spark will introduce the exchange after union and before the join.

Even though the tables involved in this case are bucketed with same number of buckets and on the same column, Spark had to introduce the shuffle operation because there was a UNION.

Lessons learned:

To get the most of the Bucketed tables.

  1. have both the tables bucketed.
  2. Number of buckets should be same in both the tables.
  3. Bucketed column should match in both the tables.
  4. Spark Bucketing is not compatible with Hive bucketing and it would introduce the extra sort.
  5. Keep an eye on the number of tasks as this would effect the number of files to be created in spark bucketing.

I hope this blog was helpful and helped you to understand the Spark bucketing and how does it work under the hood. And at the same time how to use the Spark bucketing efficiently.

You can find the above code as a DBC file in my GitHub repo:

Ajith Shetty

BigData Engineer — Love for Bigdata, Analytics, Cloud and Infrastructure.

Subscribe✉️ ||More blogs📝||Linked In📊||Profile Page📚||Git Repo👓

Subscribe to my: Weekly Newsletter Just Enough Data

--

--

Ajith Shetty
Analytics Vidhya

Bigdata Engineer — Love for BigData, Analytics, Cloud and Infrastructure. Want to talk more? Ping me in Linked In: https://www.linkedin.com/in/ajshetty28/