Improve SQL Limit Parallelism by leveraging Map Output Statistics in Spark

Liang-Chi Hsieh
2 min readSep 11, 2017

--

Limit is an usual SQL operator. In SparkSQL, a limit operation is constituted as a LocalLimit and a GlobalLimit. The reason to split a limit operation to local and global ones is to better use the data partition to reduce the amount of data for processing. For example, image you have 10 data partitions which are 10 million rows/10 GB in each partition. If we want to have just 100 rows in the end, we don’t need to count the 100 rows after we have all the data. We just count 100 rows in each partition and then shuffle all rows to one single partition. It reduces the data for shuffling.

However, if we want to have a big number of rows in a vast table, e.g., 1 billion rows, you will have 1 billion rows from each partition for shuffling into the one single partition. It will be a huge burden.

Currently I have a pull request to Spark which is not merged yet. It tries to perform the GlobalLimit without shuffling the data from all partitions to one single partition. The test shows it can boost 3x performance for the big limit number case.

What the pull request did is to just perform the map stage of the shuffling. We don’t change the data partition, so it looks like we conduct a local shuffling. We can collect map output statistics of a map stage. By inspecting the statistics, we know how many rows in each partition.

In the GlobalLimit operator, we ask the necessary rows in each partition until satisfying the number of limit. For example, if there are partitions with rows like [10, 50, 20, 30] and we ask for limiting 50 rows only. We only need the row distribution like [10, 15, 15, 10] from the partitions. So in GlobalLimit operator, we simply invoke mapPartitions API on the local shuffled data and take necessary rows in each partition.

That’s said we don’t do any real shuffling of data. Besides after GlobalLimit, we still have effect parallelism, instead of one single partition. So it can also benefit the following operators, if any.

The actual code change isn’t too much. Due to the performance boost it shows, I personally like to see this gets merged and provides to Spark users soon if possibly.

--

--