Spark Under The Hood : Partition

Spark is a distributed computing engine and its main abstraction is a resilient distributed dataset (RDD), which can be viewed as a distributed collection. RDDs are collections of objects. Under the hood, these objects are stored in partitions. When performing computations on RDDs, these partitions can be operated on in parallel. Understanding how Spark deals with partitions allow us to control the application parallelism which leads to better cluster utilization — fewer costs and better execution time.

Before diving deep into Partitioning, let’s understand various steps that gets executed under the hood in a Spark Application.

When you submit a Spark job, the driver implicitly converts the code containing transformations and actions performed on the RDDs into a logical Directed Acyclic Graph (DAG). The driver program also performs certain optimizations like pipelining transformations and then it converts the logical DAG into physical execution plan with set of stages. A Stage is a combination of transformations which does not cause any shuffling, pipelining as many narrow transformations (eg: map, filter etc) as possible.

Transformation grouped into Stages

If suppose we read a file in Spark, the entire content of the file is partitioned into multiple smaller chunks. Any RDD for that matter is partitioned by Spark into multiple partitions. When we apply a transformation on a RDD, the transformation is applied to each of its partition. Spark spawns a single Task for a single partition, which will run inside the executor JVM. Each stage contains as many tasks as partitions of the RDD and will perform the transformations (map, filter etc) pipelined in the stage.

Relationship between Tasks and Partitions in RDD
Visualization of RDD being partitioned

However, at times the programmers would like to change the partitioning scheme by changing the size of the partitions and number of partitions based on the requirements of the application. Partition can be done at the source of RDD or by using the repartition() method. For example, jdbc() method of the SQLContext reads a table from any jdbc datasource as a dataframe.

sqlContext.read.jdbc(connection, table, column, lowerBound, upperBound, numberOfPartitions, properties)

jdbc() takes in numberOfPartitions which determines how many partitions the table will be divided into based on the lowerBound and upperBound parameters.

df.repartition(numberOfPartitions)

repartition() shuffles the data between the executors and divides the data into number of partitions. But this might be an expensive operation since it shuffles the data between executors and involves network traffic. Ideal place to partition is at the data source, while fetching the data. Things can speed up greatly when data is partitioned the right way but can dramatically slow down when done wrong, especially due the Shuffle operation.

When the data is key-value oriented, partitioning becomes imperative because for subsequent transformations on the RDD, there’s a fair amount of shuffling of data across the network. If similar keys or range of keys are stored in the same partition then the shuffling is minimized and the processing becomes substantially fast.

Given that the data is partitioned, when we run different transformations on a RDD (map, flatMap, filter and others), the transformation code (closure) is:

  1. serialized on the driver node,
  2. shipped to the appropriate nodes in the cluster,
  3. deserialized,
  4. and finally executed on the nodes.
class SparkApp {
val rdd = sc.parallelize(1 to 10000, 20)
// rdd has 20 partitions
 def run =  {
val result = rdd.map(someFunc)
}

def someFunc(a: Int) = a + 1
}

When we run the above example, Spark tries to serialize the whole SparkApp class since methods cannot be serialized on their own, so that the code will still work when executed in another JVM. So we need to make SparkAppclass serializable, so the whole class can be serialized by Spark. If Spark is not able to serialize, then it throws

Task not serializable: java.io.NotSerializableException

Clusters will not be fully utilized unless the level of parallelism for each operation is high enough. Having too less partitions results in

  • Less concurrency,
  • Increase memory pressure for transformation which involves shuffle,
  • More susceptible for data skew.

However, having too many partitions might also have negative impact

  • Too much time spent in scheduling multiple tasks

The recommend number of partitions is around 3 or 4 times the number of CPUs in the cluster so that the work gets distributed more evenly among the CPUs.