Shuffling and Bucketing in Spark 3

Twin Health Background

Twin Health is a California based startup, which works to reverse and prevent metabolic disease. It has a product which solves the chronic Metabolic disease problem using the Whole Body Digital Twin™, a technology built on advanced sensors, mobile technologies, Artificial Intelligence and Machine Learning.

Machine Learning is one of the tools which completes the Whole Body Digital Twin. Our Machine Learning Model does daily forecasts based on client input data and provides feedback to Twin Physicians. It helps our physicians to make daily decisions based on specific clients’ input data as each and every person’s metabolism is different and complex.

Problem Statement

At Twin, we run multiple ML models and we are working on scaling the ML data pipeline. We have started using spark on kubernetes to scale our ML pipeline jobs. Spark on kubernetes is a serverless scalable data processing platform and this will become an important part of our data processing infrastructure. We started with moving our data cleaning job for Feature Engineering to Spark to make it horizontally scalable.

Data Processing Problems at High Level

At Twin, While moving our python based data cleaning jobs to spark, we have observed below things while we started working on the data cleaning process.

  • We are using many Group By expression in our daily data processing and it was getting time consuming for few GB sizes of tables in Spark
  • We are using join very extensively and Spark joining problem is bigger in our case as we do join for multiple dataframe
  • Due to the above problem we have data spilling more than 100 GB for small sizes of multiple dataframe joins. I also see an increase in disk IO overhead and increase in IO overhead lead to so much garbage collection
  • The root cause of this problem is Data shuffling which is almost 300 GB total read/write shuffle. We were merging a total 7 to 8 dataframe and the total data size is only a few GBs
  • The data spilling and shuffling cause common errors like running out of space, driver killing executors frequently due to no space. It may lead to your job running for long hours.

As we have observed that group by and shuffling lead to performance downgrade. We have replaced the group with a window partition, which we will discuss in the next article.. So, here we will discuss how we have started working on solving the data shuffling problem which leads by rearrange partition while we do multiple joins in Spark.

How We Solved the Data Shuffling Problem at Twin?

As we discussed above, the shuffling problem is getting bigger for us and we have thought about 3 things.

  • We are using Spark on Kubernetes, so we could add a local SSD to get high throughput and latency to handle data spilling. We didn’t try this option for now due to time constraints.
  • We have tried to cache the data frame, broadcast the certain size of dataframe and repartition data frame at the same partition size. It has almost solved our problem and we got less data shuffling size from 200 GB to a few Gbs. It almost solved the problem as performance improved with less number of storage errors.
  • Last thing we tried was bucketing. In this case we have used hashing technique and bucketing based on the joining key. In this solution data shuffling is 0 with no errors for multiple data frame joining. Here we will discuss bucketing in spark with different examples.

What is Bucketing?

Bucketing is used to organize data in the filesystem where we have to deal with large datasets and leverage that in the subsequent queries by distributing data into partitions in a more efficient way. In Big data pipeline tables and partitions are sub-categorized and sorted into buckets for better structure of data to get efficient access of data.

Here we will see how the Bucketing used to enhance the data locality and improve the performance of data engineering processes. We will see how bucketing works in Spark. We have used bucketing in Spark 3 with pyspark to resolve the data shuffling problem to join multiple dataframe.

Why does Spark need bucketing?

Spark is a distributed data processing engine. It will split the data into partitions to get better performance and assign specific chunks of data to the computational engine. But in a few cases Spark needs to shuffle data between the executors in the cluster. The reason for shuffling is transformations required on partition data that is not present locally in the partition.

  • Data shuffling happens when we join two big tables in Spark. While spark joins two dataframe by key, the partition needs to move the same value of join key in the same executor.
  • Shuffle also happens when we want to perform groupByKey to collect all the values for a key together and perform an action on them. You can reduce the data shuffling by replacing groupByKey with reduceByKey. But there are few cases where you have to use groupByKey.

As we see in the bucketing example, it will organize the data in partition with better structure so query performance will improve. This performance improvement can be achieved by avoiding the shuffling between the executors.

Spark sort merge join, Window Functions and aggregations require the data to be repartitioned by the same keys, to get the rows that have the same value of the joining key in the same partition. We can achieve this in spark by repartitioning the data while we do join, basically spark physically moving the data between executors when it performs the action. We have tried to get performance by cache and repartition the data frame.

With Bucketing we can save the data in a pre-shuffle state. If two dataframe have the same bucket key and sort key. then in this case Spark is aware about data distribution and no need to do shuffle.

We have tried the repartition and caching approach for joining multiple data frames but got better results with bucketing.

How to Create data Bucket in Spark

  • Below is the example to create the bucket in SparkAPI. bucketBy is the function to create the bucket in spark. We need to save the information about the bucket somewhere, so we need to use saveAsTable here to save the metadata information about the bucket table.

# n is number of buckets that will be created

df.write.mode(“save_mode”)

.option(“path”, “s3 path/hdfs path”) \

.bucketBy(n, ‘col1’, ‘col2’..) \

.sortBy(‘col1’, ‘col2’) \

.saveAsTable(‘table_name’, format=’parquet’)

df = spark.table(‘table_name’)

  • In the above example we used bucketBy and sortBy as in some cases we have multiple join keys and wanted to put integer key in bucketBy and String key in sortBy. sortBy is optional while we do data Bucket.
  • One can decide the number of bucket sizes based on data size and query we run on the data. Usually one can prefer the 100 MB to 200 MB per bucket.
  • Bucket table will save the tables in path with below naming convention. It has task (043, 049, 051) and bucket(01, 06, 03) information. See in the image below.

Image 1 bucket parquet file from pyspark

  • The main problem inSpark bucketBy function is that it may create so many small files and it may lead to performance problems. It can be avoided by creating a custom partition before writing the bucket data.

This problem in Spark is very different from bucketing in Hive.

Spark creates the bucket files per the number of buckets and task writer. So, it will usually create number of file = Number of buckets * number of spark writer

If you assign 200 buckets to the dataframe and no partition set for the spark data frame. In this case it may create number of files 200 buckets * 200 default partitions(1 task per partition). So, the simple way we can understand that spark jobs have 200 tasks and each task carries the data for 200 buckets. So, to resolve this, we need to create one bucket per partition and it can be achieved by custom partitions. Spark uses the same expression to distribute the data across the buckets and will generate one file per bucket.

df.repartition(F.expr(“pmod(hash(col1), n)”)) .write.mode(“save_mode”)

.option(“path”, “s3 path/hdfs path”) \

.bucketBy(n, ‘col1’, ‘col2’..) \

.sortBy(‘col1’, ‘col2’) \

.saveAsTable(‘table_name’, format=’parquet’)

How does Bucketing work in Spark?

So far we have seen why we need bucketing and how it can be created in Spark. In this section we are going to discuss how it internally works in Spark. We are focusing on Spark join, so let see how you can confirm that your join is shuffle free after using the bucket.

Spark automatically applies broadcast join based on size threshold and does the shuffle sort merge join on other cases by default. Lets see different approaches given below for bucketing.

Bucketing is enabled by default in Spark. You can check it by the config property.

spark.conf.get(“spark.sql.sources.bucketing.enabled”)

  1. Let see in the first example that we have two tables which are bigger than broadcast threshold size. In this case Spark distributes both the tables across clusters such a way that joining key data gets in the same partition. It requires shuffling across the executors. In below image Exchange operator gives the shuffle details and also shuffle operator.

Image 2 Sort Merge Join without Bucketed Table

As we discussed, the shuffle can be removed by the bucket. So, if we create two bucketed dataframe with the same joining key and same number of buckets, it does not require shuffle and will get pre shuffled data on the cluster. In the image below, you can see spark created test_bucketed1 and test_bucketed2 tables. It has no Exchange operator and it will improve your performance as it dont require any shuffling.

We have set below config for the spark session to get the sort merge join. It will disable the broadcast join.

spark.conf.set(“spark.sql.autoBroadcastJoinThreshold”, -1)

Image 3 Sort Merge Join with Bucketed Table and disable broadcast join

2. In the above example we have disable the broadcast. In case if we don’t disable it then Spark will by default do Broadcast join if data size meets the threshold value. Below Image shows that if we join two bucketed tables without disabling broadcast join. The dataframe reads small 10 files and it fits for the broadcast, so use the Broadcast Hash Join. In broadcast hash join first it will broadcast a smaller data set to all executors, which represent the BroadcastExchange operator.

Image 4 Sort Merge Join with Bucketed Table and active broadcast join

It’s ok not to disable broadcast join as broadcast hash join will help if data size is small for one of the dataframe. But if the data size is bigger or if you disable the broadcast join then spark will prefer sort merge join.

3. Here we will see, if we don’t have the same bucket key and join key, then what will happen. Assume we have created two bucketed tables with col1(int) and sort by col2(string) and col3(date) as given below. We have done joins based on col1, col2 and col3.

df.repartition(F.expr(“pmod(hash(col1),n)”)) .write.mode(“save_mode”)

.option(“path”, “s3 path/hdfs path”) \

.bucketBy(n, ‘col1’) \

.sortBy(‘col2’, ‘col3’) \

.saveAsTable(‘table_name’, format=’parquet’)

df_out = df.join(

other=df1, on=[col1, col2, col3],

how=’left’)

Below image looks exactly like the case when we don’t have a bucket and the join key are the same. We have observed that it has less data spilling and less shuffled data. We have seen that timing for saving bucket tables is also less compared to multiple bucket keys without sorting. Make Note that from Spark 3.0 onward sorting is present for one file per bucket, so while you use multiple keys as bucket no need of sorting.

Image 5 Sort Merge Join with Bucketed Table but different bucket and join key

4. Lets see one more scenario when one table is bucketed and another is not. In this case shuffling depends on the number of shuffle partitions.

spark.conf.set(“spark.sql.shuffle.partitions”, n)

If the number of buckets is less than the number of shuffle partitions, then shuffle will happen for the both tables as Spark does for without bucketed tables.

If the number of buckets is equal to or greater than the number of Shuffle partitions, then spark will shuffle for the tables which are not bucketed. Please see below image where you find the bucket table test_bucketed2 dont have an Exchange operator(No Shuffle) as we set the number of shuffle partitions less than the number of buckets.

Image 6 Sort Merge Join with Bucketed Table and Non Bucketed Table

5. Finally we will see if both the tables are bucketed but with different numbers of buckets. In this case it will be a similar scenario as we discussed above that it depends on the number of shuffle partitions and only one table will be shuffled.

  • Case 1, table1 has 40 buckets, table2 has 80 buckets and the number of shuffle partitions set 100. In this case both the tables will shuffle as we have seen above.
  • Case 2, table1 has 40 buckets, table2 has 80 buckets and the number of shuffle partitions set 50. In this case only table1 will shuffle into 80 partitions and it will generate a sql graph as given in Image 6.

But after Spark 3.1 the table with a higher bucket will coalesce into a smaller number of buckets. It can be set by the configuration below.

spark.conf.set(“spark.sql.bucketing.coalesceBucketsInJoin.enabled”, “true”)

What are we using and Future Work?

Currently At twin, we are using approach 1 as some of our data size is smaller than default broadcast threshold.

  • we have not disabled the broadcast join. So, for small sizes Spark still does broadcast join.
  • There are three columns which we used for joining multiple data frames, so bucket and hash partition size is based on all those 3 columns.
  • We store all this bucket dataframe on S3 and We may revisit it once it is used by our downstream jobs.

df.repartition(F.expr(“pmod(hash(col1,col2,col3),200)”)) .write.mode(“save_mode”)

.option(“path”, “s3 path”) \

.bucketBy(200, ‘col1’, ‘col2’, ‘col3’) \

.saveAsTable(‘table_name’, format=’parquet’)

We will do below improvement as Future work before our daily data size goes above 20 GB. We also write on this in our next blogs.

  • Add Local SSD in Spark Kubernete pods.
  • Choose right technique like group By, Windows and RDD map reduce operation
  • Spark Performance Tuning for Feature Engineering

Conclusion

At Twin, We have seen how data shuffling problems lead to performance issues in spark. We have used different approaches to solve this problem and still we need to improve our kubernetes configuration for spark like attaching Local SSD. We achieved 0 size shuffling from 200 GB read/write shuffling using bucketing.

In this article We have seen how bucketing works in Spark. We have written this article by keeping in mind what basics you should know before using the bucketing in Spark. We have discussed all the bucketing scenarios in case of joining the two tables. We have seen a couple of problems which are definitely faced by Data Engineers. Normally Spark jobs get more failures and run time overhead in case of joining the data. We have focused on shuffling problems in a distributed environment which can be handled by bucket.

--

--