How Apache Spark Scales to Big Data
Over the past decade, Apache Spark has grown from a UC Berkeley research project to become the de facto standard for distributed computing in commercial organisations all over the world. Whether hosted by Databricks or self-managed on premise with Cloudera (or the now-merged Hortonworks), they all share a similar underlying compute engine for processing data at scale.
This article takes a look under the hood to examine how compute is distributed across nodes in a cluster, and what bottlenecks we should be aware of.
Prelude: A Tale of Three Spark APIs
RDDs
Back in 2016, Apache Spark users worked with Resilient Distributed Datasets, aka RDDs, to process datasets too large to fit in a single computer. Logically, an RDD is simply a collection of objects, similar toList[int]
or List[str]
or even List[MyModel]
which could give further type-safety information like MyModel.age
being an int
.
Runtime optimisations are primarily the responsibility of the programmer
While RDDs were good for type-safety and precisely executing data transformations as defined, they are relatively low-level. If you added a filter at the end of a long pipeline instead of at the beginning, that’s exactly what you’ll get: a whole lot of data clogging up most of the pipeline till the end, only to be discarded by the filter just before the output. This is why RDDs are considered relatively low-level, with runtime optimisations heavily dependent on the programmer.
For the next generation API, Spark took a page from all the major RDBMSes: have the programmer declare the transformations in SQL, then internally construct an optimum logical + physical plan. Introducing…
DataFrames
With this API, we can truly start to think of our data as rows in a table, rather than collections of objects. Instead of being sub-attributes of objects in a collection, columns became a better-supported concept that can be queried and aggregated by themselves.
Apart from making it easier to work with columns, DataFrames also take the initiative to optimise the query both logically and physically. A simple example is shown in the Scala code snippet below:
import org.apache.spark.sql.functions.{udf, col}val expensive_function = (x: Int) => {
println(s"expensive_function invoked on element #$x")
x
}
val expensive_udf = udf(expensive_function)spark.range(10)
.withColumn("col", expensive_udf(col("id")))
.filter("id==1 or id==2")
.show()
Output:
expensive_function invoked on element #1
expensive_function invoked on element #2
+---+---+
| id|col|
+---+---+
| 1| 1|
| 2| 2|
+---+---+
Even though the .filter()
method is called after expensive_udf
, Spark’s Catalyst optimiser moved the filter to before, thus invoking the expensive function only for the two relevant rows, potentially speeding up the runtime by orders of magnitudes for highly selective filters.
With the Catalyst optimiser, Spark abstracts away low-level optimisation, freeing up programmers’ attention to focus on analytics logic
However, with this new API, we lose type-safety: a DataFrame is now a collection of generic Row objects. At compile time, there is no mechanism to know what datatype each column is, or whether a given column even exists at all! In fact, since Spark 2.0, DataFrames are simply just an alias of Dataset[Row]
, which is the 3rd generation API.
Datasets
Datasets bring together the best of RDD
and DataFrame
: type-safety + Catalyst optimizer, but alas, it is only available in Java/Scala, not Python.
This article is written as more of an academic study than a practical guide. We’ll be using the RDD Scala API to investigate the inner workings of Spark. This way, the query optimiser won’t kick in to compact our code into an opaque WholeStageCodeGen
. In production, DataFrames/DataSets are typically the API of choice.
Transformations vs Actions
There are two types of methods we can call on RDDs. Transformations and Actions.
Transformations
Transformations are lazily evaluated. Meaning that they return the resulting RDD near-instantly, but the contents of the RDD are not evaluated yet until absolutely necessary (which is when an action is called!). In the meantime, Spark merely records these transformations as a sequence of computations to be carried out, termed as Directed Acyclic Graphs (DAGs).
Examples of RDD transformations:
rdd.map(_ * 2)
rdd.filter(txn => txn.status == "success")
rdd.groupBy(user => user.country)
rdd.distinct()
Actions
Actions on an RDD properly start the processing of data through the transformations that we defined earlier in the DAG.
Examples of RDD actions:
rdd.count()
rdd.foreach(row => db.write(row))
rdd.saveAsTextFile("/path/to/output.txt")
val myList = rdd.collect()
A quick way to guess whether a method is an action or transformation, is to consider whether the output actually returns data (be it a variable in memory or a file in a filesystem), or does it return yet another RDD. If it’s the former, it’s likely to be an action.
Note that in the case of .collect()
, data flows from many remote executors to a single Spark Driver (or the Scala/Python client), so we must ensure that the result is small enough to fit in memory. .collect()
essentially brings together all elements in all remote executors into the Spark Driver and subsequently into a Scala/Python array. So the onus is on us not to call .collect()
on a large RDD!
A simple DAG
We’ve been mentioning that transformation methods build a series of processing steps in a DAG. Let’s see that in practice by constructing a simple DAG in Scala:
val rdd1 = sc.range(1,11) // [ 1, 2, 3, ..., 10]
var rdd2 = sc.range(10,21) // [11, 12, 13, ..., 20]
rdd2 = rdd2.map(_*2) // [22, 24, 26, ..., 40]
val rdd3 = sc.union(Seq(rdd1, rdd2)) // [1...10, 22...40]
var rdd4 = rdd3.filter(_%2==0) // [2, 4...10, 22...40] evens
rdd4 = rdd4.sample(withReplacement=false, fraction=0.5) // take half
rdd4.collect()
To visualise this DAG, the last line above calls an action .collect()
and from the Stages tab in Spark UI at http://localhost:4040/jobs/job/?id=0, we get something like the following:
Data flows from the top, starting with the two sc.range()
RDDs. The one on the right has an additional map which creates rdd2
. Both are combined together in a union
, followed by a filter
and a sample
. This is exactly following our code, with no automatic re-ordering or other optimisations.
Spark Cluster Architecture
Now that we know how to program in Spark, let’s take a look at how it carries out the transformations in a YARN cluster.
YARN ResourceMananger & NodeManager
The NodeManager is a Java service that runs on every node in the cluster. It allocates node resources (vCPUs, RAM, GPUs) to YARN applications, allowing them to start Spark Executors. By reserving resources, we avoid overallocating resources that a node simply does not have! All NodeManagers in a cluster report to a single ResourceManager, which administrates the cluster as a whole.
Whenever we set the following settings, we’re actually receiving resources (via ResourceManager) from the various NodeManagers.
spark.executor.instances 3
spark.executor.cores 4
spark.executor.memory 12g
A Spark application with the above settings indicates that it requires 3 Spark Executors to run. Each executor must be allocated 4 cores along with 12GB of memory. If each node in a cluster is configured to serve 8 cores and 32GB RAM, then this application will be distributed across at least 2 nodes. One node would run two Executors while the other runs the 3rd Executor. Of course, YARN may also decide to allocate 1 Executor across 3 different nodes instead to better distribute the load.
Partitions
While an RDD is logically thought of as an array, in implementation, it is further divided up into partitions, which are not immediately apparent in typical usage. For example, an RDD with [1,2,3,4] may have [1,2] in one partition and [3,4] in another partition, or any other combination.
The point of having partitions within an RDD is so that each vCPU can work on different partitions independently. This way, a vCPU starting work on the 9000th element wouldn’t have to keep checking in with all the other vCPUs on whether they might already be working on the same element! By grouping up tens of millions of elements into just hundreds of partitions, YARN has a much easier time bulk-assigning blocks of elements (ie partitions) to the tens of vCPUs.
But wait, hundreds of partitions given to tens of vCPUs? Does that mean that each vCPU is forced to work on tens of partitions? Well, yes, but not at the same time. The Spark Driver organises the partitions into a single queue, and each vCPU “races” to get the chance to work on a partition. Whichever vCPU completes its partition the earliest will get assigned the next partition, and so on. The analogy would be diners (partitions) queueing up for a table outside a busy restaurant: the earliest table (vCPU) that becomes available gets assigned the next diner (partition).
Ideally, partitions should be similarly sized so that work can be distributed evenly. However, if the sizes are imbalanced (as example above) with one partition being significantly larger, then at the tail-end of the stage, we’ll just have the last vCPU chugging along while all the other vCPUs sit idle.
Now, you may be wondering how partitions are decided in the first place. The answer is that it initially depends on the input source; but how exactly, is rather complicated. There are lots of factors that determine the partitioning, like the number of input part files, spark.default.parallelism
, whether the file format is splittable, and the number of vCPUs available. The quickest way to find out, is to simply try it out in your target environment and view the Spark UI, or call rdd.getNumPartitions()
.
So, what can we do if we find that our partitions are skewed, or if there are too few/many? Well, the naïve thing to do would be to call the rdd.repartition(200)
method and continue from there. However, this immediately causes an additional stage boundary, which could cost more time than it saves later on. But first, what even is a stage? To explain, let’s revisit the hierarchy of Spark workloads in the UI:
Spark UI
Application
A Spark Application is tied to the SparkSession
instance. A batch job Scala/Python program typically starts with initialising a SparkSession
and ends after calling spark.stop()
, so most batch jobs simply correspond to a single Spark Application. The Spark UI is also tied to the application lifetime.
Jobs
During the lifetime of a Spark Application, we call many Spark actions like .count()
, .saveAsXxxFile()
. Every action maps to a Spark Job, which contains a DAG to describe all the steps necessary to process the data from the input source(s) to the output action.
Stages
A job’s DAG consists of one or more stages, with each stage representing a series of steps needed to transform from its input to its output. For a given stage, data is processed directly from the stage’s input to the stage’s output.
However, in between stages are intermediate results, which are ideally stored in memory, but can spillover to the hard disk (or SSD) at spark.local.dir
(usually the /tmp
of each node).
Tasks
Within each stage, many tasks represent the actual processing of the input partitions. Each task corresponds to one vCPU doing work on a different input partition. (unless there are failed and auto-retried tasks on the same partition!) These would be the green bars in the animation we saw earlier.
Narrow Transformations
Circling back to how stage boundaries come about, let’s talk about narrow transformations. A narrow transformation means that each partition can be processed independently. A vCPU doesn’t need to know anything within other partitions to perform the transformation.
For example, an rdd.map(x => x * 2)
is a narrow transformation, because we only need to look at the inputs of one partition to determine its corresponding outputs. An input element of 10 is always transformed to 20 regardless of what’s stored in other partitions.
With some thought, we may realise that a series of many narrow transformations one after the other would still remain narrow. This is what makes up a Stage: a series of consecutive narrow transformations chained together. Within each stage, elements flow directly from the input partition to the output partition. There are no intermediate results stored between consecutive narrow transformations.
Usually, these transformations are not only independent of other partitions, but are also independent element-wise. Unless, of course, if you’ve applied some custom partitioner to specifically redefine the partition splits with special logic for a special transformation.
Wide Transformations
On the other hand, wide transformations must retrieve information from other partitions to make decisions on how to process any given partition.
A simple example would be rdd.distinct()
on a non-sorted RDD. Looking within a single partition, it’s impossible to decide whether to emit a given input element without first knowing whether that same element might also be found in another partition. This other partition could possibly be somewhere far away, being processed by another vCPU, in a different part of the datacentre.
Hence, in order to carry out the transformation, we must first ensure that all identical elements are guaranteed to be co-located within the same partition. Typically, this is already done by default with a HashPartitioner, which hashes every element into an unpredictable but deterministic number, like 337700102
. However, since we probably don’t (and shouldn’t) have millions of partitions, we modulo (%
) it by num_partitions
to uniformly allocate a similar number of elements to each partition. Crucially, duplicate elements always produce the same hash and get sorted into the same partition. From this point on, a Spark task is now be able to process each partition independently to achieve the .distinct()
transformation.
It should be noted that a single input partition on a node was converted into many output partitions, but they would initially still stay within the same node (in memory or spark.local.dir
). When the next Stage picks up a partition, they would have to collate elements of that partition from each node all over the entire cluster. This process, known as a shuffle, is usually time-consuming as it involves sending data through the network. As such, we typically try to avoid repartitioning unless absolutely necessary, or if it saves more time than it costs (recall the severely imbalanced partitions!)
A note on GroupBy
In SQL, it’s common to run GROUP BY queries to get statistics like so:
SELECT word, COUNT(1)
FROM book
GROUP BY 1
As such, a common pitfall for Spark beginners coming from SQL backgrounds is to write code like this:
sc.textFile("sherlock.txt")
.flatMap(_.split(" "))
.groupBy(word=>word)
.mapValues(_.size)
.collect()
In the above code, the groupBy()
truly brings together all elements of the same key into the same partition. However, this is oftentimes not strictly necessary to continue on to the next step of counting words. Instead of shuffling elements all across the cluster with groupBy()
, we can try the following:
sc.textFile("sherlock.txt")
.flatMap(_.split(" "))
.keyBy(word=>word)
.aggregateByKey(0)( (tally,word) => tally+1, (t1,t2) => t1+t2 )
.collect()
In this version, we first tally up words within the same input partition ((tally, word) => tally+1
), then send only the intermediate tallies throughout the cluster for collating the final tally ((t1,t2) => t1+t2
). Because we’ve avoided sending all elements across the network, we’ve saved ourselves a lot of shuffle write and shuffle read times.
However, for an operation as common as counting, there’s already a shortcut method built into the RDD API:
sc.textFile("sherlock.txt")
.flatMap(_.split(" "))
.countByValue()
By peeking at the source code, we’ll find that it’s ultimately based on reduceByKey()
, with the same idea of pre-aggregating some metadata before shuffling just the metadata for further combining.
Tip for non-commutative aggregations
In some cases, we may wish to calculate a non-commutative aggregation like average or variance. With a little creativity, we can reframe the problem by first calculating the sum and counts, which are both commutative operations. By dividing the sum over counts, we’ll get the final desired average.
Nevertheless, while many common aggregations can be reframed as a series of commutative aggregations, some aggregations like percentiles or string concatenation will remain stubbornly non-commutative. In these cases, we use groupBy
as a last resort.
Wrapping it up
Using the RDD API, we’ve shown how a Spark application is divided up into Jobs, Stages and Tasks. Upon every action, tasks are assigned to vCPUs across the cluster to work on different partitions. If some cross-partition information is necessary, we can also perform a shuffle operation to convert it into an intra-partition-only task for the next stage, at the cost of sending data through the cluster network.
By understanding how partitioning works to distribute workloads across many nodes, I hope you’ve gained the confidence to address Spark bottlenecks and recommend more performant solutions.