This blog is basically for people who just come from pandas-scikit flavour to pyspark api.
Recently, I was asked to explain the usage of
aggregate method in pyspark, and it just remind me that I was also a bit confused when I first saw this method for both RDD and pyspark DataFrame.
If you only care about how to use it, you could simply jump to
How it works in Spark section.
For taking away why we want to use it, you can just think the reason we apply
aggregate is because it's faster than
reduce and it's also easier to apply multiple abstract functions in the reduce operations.
Why aggregate function
Before we explain
how it works, we might need to understand
why we need it.
Firstly, we need to know
aggregate function here in Spark is a curryied function thanks to Mathematics and Java. The following is a morceau of wikipedia about the curried function.
In mathematics and computer science, currying is the technique of translating the evaluation of a function that takes multiple arguments into evaluating a sequence of functions, each with a single argument. For example, a function that takes two arguments, one from X and one from Y, and produces outputs in Z, by currying is translated into a function that takes a single argument from X and produces as outputs functions from Y to Z. Currying is related to, but not the same as, partial application.
Currying is useful in both practical and theoretical settings. In functional programming languages, and many others, it provides a way of automatically managing how arguments are passed to functions and exceptions. In theoretical computer science, it provides a way to study functions with multiple arguments in simpler theoretical models which provide only one argument.
Furthermore, it has bunch of mathematic theory behind it. If you’re interested in it, you could visit wiki first to have more taste about it.
Currying allows us to apply functions in many ways by using a lightweight syntax and then pass these partially applied functions around to higher order function such as
filter. Higher order functions which take functions as parameters or yield them as results are the bread and butter of functional programming.
In short, with currying, we can easily reuse lots of abstract functions in one single process since it makes creating anonymous functions much easier. That’s one of the important thing that functional programming want to serve. Currying and partially applied functions enable higher order functions to be used much more effectively and concisely.
That’s also why in
pyspark, we keep seeing people using
lambda within the
aggregate is a faster computation then using a combination of
Here’s an simple example about the bench marks:
The experiment is using data from KDD cup 99 full data set, which contains about 5 million rows of data.
How it works in Spark
I will mainly use python notation as example
Firstly, the definition of aggregate function is as following:
seqOp will aggregate the elements from all the partitions, and
combOp will merge all the result of
seqOp in all the partitions. Both of the operation share the same initial values which is called
zeroValue is the initial value for the accumulated result of each partition for the
seqOp operator, and also the initial value for the combine results from different partitions for the
combOp operator - this will typically be the neutral element (e.g. Null for list concatenation or 0 for summation). In short, it will be used in the both operations, and people tend to forget it will be used in the second step!
seqOp is an operator used to accumulate results within a partition. This operation will walk thourgh all the
elements (T) in all the partitions. All the
T will merge with
zeroValue, and its result will merge with
T and so on until it loops over all the partitions. In other words, this sequential operation is just like rolling a given function on RDD.
combOp is an associative operator used to combine results from different partitions. It will return a different type result comparing to the original RDD. As result, we need use
seqOp to merge all the
T in each partition to
U, and then we use
combOp to combine all the
These operations are TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.
i.e. The functions
op(t1, t2)is allowed to modify
t1and return it as its result value to avoid object allocation; however, it should not modify
Now we want to use
aggregate to calculate its average.
So the process is like this:
- At the beginning, we have an initial value
- (acc, number) = (acc + number, acc + 1)
- where number is the
Tin our previos definition, which is as also know as the element of list in one partition.
- On the RHS, the first element is an accumlation of a list in one partion, and the second element is just like counting numbers.
What’s gonna happen? Let;s split the operations into multiple steps:
Assume that now we put all the list in one single partition, so the following is just like:
(Initial value +
T, initial value + 1) in
seqOp=> (0 + 45, 0 + 9)
=> (45, 9)
What if Spark split the list into multiple partitions, let’s say 3 partitions.
- Suppose we have:
1) Partition 1: [1, 2, 3, 4]
2) Partition 2: [5, 6, 7, 8]
3) Partition 3: 
- After the operation of
seqOp, we will get:
1) Partition 1: (10, 4)
2) Partition 2: (26, 4)
3) Partition 3: (9, 1)
- Then like what we mention what
combOpwill do is to combine all the results from all the partitions:
=> (45, 9)
Et voila, we will get the same result as linear one.
Now if we want to calculate the average, we could simply do 45 / 9
Try to figure out the result of the following process, you might need a calculator 😅: