How Spark dataframe shuffling can hurt your partitioning

Manuel Mourato
3 min readFeb 21, 2018

--

For those who work with Spark as an ETL processing tool in production scenarios, the term shuffling is nothing new. It happens when we perform RDD operations like GroupBy or GroupByKey, which require that data is moved between partitions, in the same machine if you are lucky, or even between executers. But what does shuffling have to do with partitioning?

Well, nothing really if you are working with RDDs…but with dataframes, that’s a different story.

Let’s take a look at a simple example:

>>> sc.defaultParallelism 
8
>>> number_list=sc.parallelize(xrange(0, 100, 1))
>>> number_list.collect()
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99]
>>> pair_number_list=number_list.map(lambda x:(1,x))
>>> pair_number_list.getNumPartitions()
8
>>> grouped_rdd=pair_number_list.groupByKey()
>>> grouped_rdd.getNumPartitions()
8

So we see that with RDDs the partitioning does not change. Let’s now do the same but with a Dataframe

>>> df=sqlContext.createDataframe(pair_number_list) 
>>> df.show(4)
+---+---+
| _1| _2|
+---+---+
| 1| 0|
| 1| 1|
| 1| 2|
| 1| 3|
+---+---+
>>> df_grouped=df.groupBy("_1").count()
>>> # groupBy needs to know by which function to aggregate the data by, thus the .count()
>>> df_grouped.show()
+---+-----+
| _1|count|
+---+-----+
| 1| 100|
+---+-----+
>>> df_grouped.rdd.getNumPartitions()
200

As you can see the partition number suddenly increases. This is due to the fact that the Spark SQL module contains the following default configuration: spark.sql.shuffle.partitions set to 200.

Depending on your use case, this can be benefitial or harmfull. In this particular case, one can see that the data volume is not enough to fill all the partitions when there are 200 of them, which causes unnecessary map reduce operations, and ultimately causes the creation of very small files in HDFS, which is not desirable. If however you have too little partitions, and lots of data to process, each of your executors memory might not be enough/available to process so much at one given time, causing errors like this java.lang.OutOfMemoryError: Java heap space . Remember that each partition is a unit of processing in Spark.

So to conclude: take a look at your application with care and figure out what your ideal partition number is. Working with dataframes usually brings more optimizations, but don’t forget to set the spark.sql.shuffle.partitions configuration to a value that suits your use case.

Smaller one this time, but hopefully it can help you somehow!

Please give your opinions and suggestions bellow!

Manuel Duarte Vilarinho Mourato @ Big Data Engineer

--

--