Ref: https://pixabay.com/photos

GUIDE TO SPARK PARTITIONING

Determining the Number of Partitions in Apache Spark— Part I

The number of partitions plays a critical role in the execution of Spark applications. This story in two parts would serve as a guide to reason the number of partitions contained in an RDD or dataset

Ajay Gupta
DataSeries
Published in
9 min readJun 9, 2020

--

Data in Spark remain always partitioned right after reading from a data source, during intermediate transformation(s), and till the point when an action is performed to produce the desired output. The partitioned data at each stage is represented by a low-level abstraction, called RDD. Programmers can directly use RDDs to write Spark applications. However, optionally, higher-level abstraction (built on top of RDD), called as Dataset, is also available to users to write Spark applications.

Spark execution pipeline is also built around data partitions only. A typical pipeline includes reading of one or more partitions of an input RDD, computing intermediate partition(s) of intermediate RDD(s), and finally applying an action on the computed partition(s) of the desired RDD. Further, each partition of an RDD is processed independently of other partitions.

You can now also read my recently published book, “Guide to Spark Partitioning” which deep dives into all aspects of of Spark Partitioning with multiple examples to explain each of the partitioning aspect in detail: https://www.amazon.com/dp/B08KJCT3XN/

Therefore, the number of partitions chosen for input, intermediate, and final RDD plays a very important role in the execution efficiency of a Spark application. Spark either implicitly or explicitly choose the number of partitions for an RDD. Here is a comprehensive description of how Spark choose the number of partitions while reading from a data source, and at certain popular transformations where the number of partitions could change implicitly:

Union Transformation: Number of partitions in the resultant data after doing a union on multiple RDDs/Datasets is decided as follows:

(a) Union transformation on multiple RDDs: In this case, the number of partitions in the resultant RDD (after union) are chosen in the following ways:

Illustration of Union operation on RDDs, the deciding factors for the final number of partitions in the Union RDD are: (a) number of partitions in the upstream RDDs and (b) Partitioner of upstream RDDs
  • If all upstream RDDs have the same Partitioner and the same number of partitions, then the number of partitions in the resultant RDD is also the same as in each of the upstream RDD.
  • If Partitioner and number of partitions differ in one or more upstream RDDs, Union operation adds up the number of partitions in the upstream RDDs to determine the number of partitions in the resultant RDD.
  • If Partitioner is absent in even one of the upstream RDD, then also the Union operation adds up the number of partitions in the upstream RDDs to determine the number of partitions in the resultant RDD.

(b) Union transformation on multiple Datasets: In case of union operation on multiple Datasets, the number of partitions in the resultant Dataset is always equal to the sum of the individual number of partitions across all upstream Datasets.

Partitioner of a RDD, if non empty, tells how a RDD is partitioned. A partitioner gets associated to a RDD after certain transformations are performed on the RDD, such as partitionBy, reduceByKey, etc. Also, in certain transformations, the previous partitioner is removed, such as mapPartitions, mapToPair, etc. For more information on the same, please refer this link.

Aggregation Transformation: Number of partitions in the resultant data differ when aggregation is performed on a raw RDD as compared to when aggregation is performed on a Dataset. Below is the description of how the resultant number of partitions are decided when aggregation is performed over RDD and Dataset:

(1) Aggregation on an RDD: In case of aggregating a raw RDD in a Spark application, a programmer has two flavors of different aggregation APIs. One flavor implicitly chooses the number of partitions for the resultant aggregated RDD while the other one allows a programmer to explicitly mention the number of partitions in the resultant aggregated RDD.

Illustration of Aggregation operation on RDDs, the deciding factors for the final number of partitions in the Aggregated RDD are (a) number of partitions in the parent RDD (b) ‘spark.default.parallelism’ configuration, (c) explicitly provided numPartitions in the aggregation APIs directly and (d) Whether the parent RDD is partitioned on aggregation Key.

Some of the aggregation APIs on raw RDDs which determine the resultant number of partitions implicitly are shown below:

(1) reduceByKey(scala.Function2<V,V,V> func)
(2) groupByKey()
(3) aggregateByKey(U zeroValue, scala.Function2<U,V,U> seqOp, scala.Function2<U,U,U> combOp, scala.reflect.ClassTag<U> evidence$2)

Another flavor of above Aggregation APIs is also provided by Spark in which the resultant number of partitions are explicitly asked, these APIs are shown below:

(1) reduceByKey(scala.Function2<V,V,V> func, int numPartitions)
(2) groupByKey(int numPartitions)
(3) aggregateByKey(U zeroValue, int numPartitions, scala.Function2<U,V,U> seqOp, scala.Function2<U,U,U> combOp, scala.reflect.ClassTag<U> evidence$2)

In order to implicitly determine the resultant number of partitions, aggregation APIs first lookout for a configuration property ‘spark.default.parallelism’. If this property is not set, the number of partitions in the aggregated RDD is always equal to the number of partitions in the parent RDD on which aggregation is performed.

If ‘spark.default.parallelism’ is set to some value, then there are two paths:

(a) If parent RDD has a partitioner on aggregation key(s), then the number of partitions in the aggregated RDD is equal to the number of partitions in the parent RDD.

(b) If parent RDD does not have a partitioner, then the number of partitions in the aggregated RDD is equal to the value of ‘spark.default.parallelism’.

(2) Aggregation on a Dataset: In case of aggregation being performed on a Dataset in Spark application, here are the following two ways in which the number of partitions in the resultant aggregated Dataset is decided:

Illustration of Aggregation operation on Datasets, the deciding factors for the final number of partitions in the Aggregated Dataset are: (a) number of partitions in the parent Dataset (b) ‘spark.sql.default.partitions’ configuration, (c) Whether the parent RDD is partitioned on aggregation Key.

(a) If the parent Dataset (to be aggregated) is already partitioned based on the aggregation key, then the aggregated Dataset has the same number of partitions as in the parent Dataset.

(b) If the parent Dataset is not partitioned based on the aggregation key, then the number of partitions in the aggregated Dataset is governed by a Spark config ‘spark.sql.shuffle.partitions’ whose default value is always set to 200.

A Dataset going for a aggregation operation is said to be partitioned on corresponding aggregation key, if the Dataset is a resultant of a previous repartition, aggregation or join transformation, the previous transformation being performed using the similar aggregation key. For example, in a below code snippet, Dataset ‘ds2’ which needs to be aggregated on the basis of ‘empId’ is resultant of a previous repartition operation repartitioning on ‘empId’ only.

ds2 = ds1.repartition(2000, functions.col(“empId”));
ds2.groupBy(functions.col(“empId”)).agg(functions.sum(“sal”).as(“sumsal”));

Join Transformation: Join transformation is performed over multiple RDDs or Datasets and results in a single RDD or Dataset with a certain number of partitions. The way number of partitions is decided for the joined data differs in the case of RDD and Dataset. Below is the description of the same:

(1) Join on RDDs: In case of join operation is performed on raw RDDs, two sets of Join APIs are available for programmers.

Illustration of Join operation on RDDs, the deciding factors for the final number of partitions in the Joined RDD are: (a) number of partitions upstream RDDs, (b) ‘spark.default.parallelism’ configuration, (c) explicitly provided numPartitions in the Join APIs directly and (d) Whether the upstream RDDs are partitioned on Join Key.

For one of the sets, the programmer can specify the number of partitions explicitly in various Join APIs, the specified number being the number of partitions in the resultant joined RDD. In the other set, Spark implicitly determines the number of partitions in the resultant Joined RDD.

Here are the examples of two sets for some of the Join APIs :

Explicit Set
join(RDD<scala.Tuple2<K,W>> other, int numPartitions)
leftOuterJoin(RDD<scala.Tuple2<K,W>> other, int numPartitions)
rightOuterJoin(RDD<scala.Tuple2<K,W>> other, int numPartitions)
fullOuterJoin(RDD<scala.Tuple2<K,W>> other, int numPartitions)
Implicit Set
join(RDD<scala.Tuple2<K,W>> other)
leftOuterJoin(RDD<scala.Tuple2<K,W>> other)
rightOuterJoin(RDD<scala.Tuple2<K,W>> other)
fullOuterJoin(RDD<scala.Tuple2<K,W>> other)

Following are the ways in which the number of partitions in the resultant joined RDD is decided for implicit set:

(a) If no upstream RDDs (participating in join operation) have a partitioner on join key(s), then the number of partitions in the resultant joined RDD is equal to the value configured for ‘spark.default.parallelism’ . If this config property is not set to any value, then the number of partitions in the resultant joined RDD is equal to the maximum number of partitions among all upstream RDDs. (The above-described behavior is same across old and recent Spark versions)

(b) If one or more upstream RDDs have partitioner on join key(s), then the maximum of the number of partitions among such RDDs is compared with config property ‘spark.default.parallelism’ in the following ways (in accordance with the latest versions of Spark) to choose the number of partitions in the resultant joined RDD:

  • If the config is set to some value, and the maximum value is above the config value, then the maximum value is chosen.
  • If the config is set and the maximum value is below the config value, however, the maximum value is within a single order of magnitude of the highest number of partitions among all upstream RDDs, then the maximum value is chosen. For example, if the maximum value is 120, the highest number of partitions among upstream RDDs is 1000, and the config value for ‘spark.default.parallelism’ is 1050, then 120 is chosen.
  • If the config is set and the maximum value is below the config value, however, the maximum value is not within a single order of magnitude of the highest number of partitions among all upstream RDDs, then config value is the answer. For example, if the maximum value is 80, the highest number of partitions among upstream RDDs is 1000, and the config value for ‘spark.default.parallelism’ is 1050, then 1050 is chosen.
  • If the config is not set, then the maximum value is answer.

In older versions of Spark, if one or more upstream RDDs have partitioner on join key(s), then maximum of number of partitions among such RDDs is always chosen as number of partitions for the resultant joined RDD irrespective of any value set for the config property, ‘spark.default.parallelism’

Join APIs for RDDs allow flexibility to programmers. For each of the Join API, there are two versions, In one version, programmer can mention the resultant number of partitions. In another version, spark determines the resultant number of partitions based on the number of partitions in the upstream RDDs in comparison with config property ‘spark.default.parallelism’ set to a particular value by the user.

(2) Join on Datasets: In case of join operation being performed on multiple Datasets in Spark application, here are the two ways which determine the number of partitions in the resultant joined Dataset:

(a) If none of the upstream Datasets (input to Join) is already partitioned on the basis of Join Key, then the configured value ‘spark.sql.shuffle.partitions’ is chosen as the final number of partitions in the resultant joined Dataset.

(b) If one or more of upstream Datasets are already partitioned on the basis of Join Key, then the maximum of the number of partitions among such Datasets is compared against the configured value of ‘spark.sql.shuffle.partitions’ in the following ways to decide on the resultant number of partitions:

  • If the maximum value is above the configured value of ‘spark.sql.shuffle.partitions’, then the maximum value is chosen for the resultant number of partitions in the joined Dataset.
  • If the maximum value is below the configured value of ‘spark.sql.shuffle.partitions’, then the configured value is chosen for the resultant number of partitions in the joined Dataset.
Illustration of Join on Datasets, the deciding factor for the number of partitions in the resultant Joined Dataset are (a) Number of partitions in the upstream Datasets, (b) Number of shuffle partitions and (c) Whether the upstream datasets are partitioned on Join Key

Join APIs for Datasets do not allow programmers to explicitly mention the number of partitions as in case of RDDs. Spark always determine the resultant number of partitions by itself on the basis of number of partitions present in the upstream Datasets and how the upstream Datasets are partitioned along with configuration property, ‘spark.sql.shuffle.partitions’. Also, the configuration property, ‘spark.sql.shuffle.partitions’ is always there even if not mentioned explicitly by the user, the default value being 200.

After looking at the process of arriving at the number of partitions of the resultant data in case of important and ubiquitous transformations, such as Union, Join, and Aggregation, the process of arriving at the number of partition during the creation of RDDs or Datasets (from various data sources) would be described in the next part of this story.

In case of further queries/doubts about this story, or for any feedback for the same, please do write in the comments section.

Follow the second part of this story here.

Get a copy of my recently published book on Spark Partitioning: https://www.amazon.com/dp/B08KJCT3XN/

--

--

Ajay Gupta
DataSeries

Leading Data Engineering Initiatives @ Jio, Apache Spark Specialist, Author, LinkedIn: https://www.linkedin.com/in/ajaywlan/