This blog is basically for people who just come from pandas-scikit flavour to pyspark api.

Recently, I was asked to explain the usage of `aggregate`

method in pyspark, and it just remind me that I was also a bit confused when I first saw this method for both RDD and pyspark DataFrame.

If you only care about how to use it, you could simply jump to `How it works in Spark`

section.

For taking away why we want to use it, you can just think the reason we apply `aggregate`

is because it's faster than `map`

+ `reduce`

and it's also easier to apply multiple abstract functions in the reduce operations.

# Why aggregate function

Before we explain `how it works`

, we might need to understand `why we need it`

.

**Curryied function**

Firstly, we need to know `aggregate`

function here in Spark is a curryied function thanks to Mathematics and Java. The following is a morceau of wikipedia about the curried function.

In mathematics and computer science, currying is the technique of translating the evaluation of a function that takes multiple arguments into evaluating a sequence of functions, each with a single argument. For example, a function that takes two arguments, one from X and one from Y, and produces outputs in Z, by currying is translated into a function that takes a single argument from X and produces as outputs functions from Y to Z. Currying is related to, but not the same as, partial application.

Currying is useful in both practical and theoretical settings. In functional programming languages, and many others, **it provides a way of automatically managing how arguments are passed to functions and exceptions**. In theoretical computer science, it provides a way to study functions with multiple arguments in simpler theoretical models which provide only one argument.

Furthermore, it has bunch of mathematic theory behind it. If you’re interested in it, you could visit wiki first to have more taste about it.

**Advantages?**

Currying allows us to apply functions in many ways by using a lightweight syntax and then pass these partially applied functions around to higher order function such as `map`

or `filter`

. Higher order functions which take functions as parameters or yield them as results are the bread and butter of functional programming.

In short, with currying, **we can easily reuse lots of abstract functions in one single process since it makes creating anonymous functions much easier**. That’s one of the important thing that functional programming want to serve. Currying and partially applied functions enable higher order functions to be used much more effectively and concisely.

That’s also why in `pyspark`

, we keep seeing people using `lambda`

within the `aggregate`

method.

Moreover, `aggregate`

is a faster computation then using a combination of `map`

and `reduce`

.

Here’s an simple example about the bench marks:

The experiment is using data from KDD cup 99 full data set, which contains about 5 million rows of data.

# How it works in Spark

I will mainly use python notation as example

Firstly, the definition of aggregate function is as following:

Breifly, `seqOp`

will aggregate the elements from all the partitions, and `combOp`

will merge all the result of `seqOp`

in all the partitions. Both of the operation share the same **initial values** which is called `zeroValue`

.

`zeroValue`

is the initial value for the accumulated result of each partition for the `seqOp`

operator, and also the initial value for the combine results from different partitions for the `combOp`

operator - this will typically be the neutral element (e.g. Null for list concatenation or 0 for summation). In short, it will be used in the both operations, and **people tend to forget it will be used in the second step!**

`seqOp`

is an operator used to accumulate results within a partition. This operation will walk thourgh all the `elements (T)`

in all the partitions. All the `T[0]`

will merge with `zeroValue`

, and its result will merge with `T[1]`

and so on until it loops over all the partitions. In other words, this sequential operation is just like rolling a given function on RDD.

`combOp`

is an associative operator used to combine results from different partitions. It will return a different type result comparing to the original RDD. As result, we need use `seqOp`

to merge all the `T`

in each partition to `U`

, and then we use `combOp`

to combine all the `U`

together.

These operations are TraversableOnce. Both of these functions are allowed to modify and return their first argument instead of creating a new U to avoid memory allocation.

i.e. The functions

op(t1, t2)is allowed to modify

t1and return it as its result value to avoid object allocation; however, it should not modify

t2.

# Example

Now we want to use `aggregate`

to calculate its average.

So the process is like this:

- At the beginning, we have an initial value
`(0, 0)`

- (acc, number) = (acc[0] + number, acc[1] + 1)

- where number is the`T`

in our previos definition, which is as also know as the element of list in one partition.

- On the RHS, the first element is an accumlation of a list in one partion, and the second element is just like counting numbers.

What’s gonna happen? Let;s split the operations into multiple steps:

Assume that now we put all the list in one single partition, so the following is just like:

(Initial value[0] + `T`

, initial value[1] + 1) in `seqOp`

=> (0 + 45, 0 + 9)

=> (45, 9)

What if Spark split the list into multiple partitions, let’s say 3 partitions.

- Suppose we have:

1) Partition 1: [1, 2, 3, 4]

2) Partition 2: [5, 6, 7, 8]

3) Partition 3: [9] - After the operation of
`seqOp`

, we will get:

1) Partition 1: (10, 4)

2) Partition 2: (26, 4)

3) Partition 3: (9, 1) - Then like what we mention what
`combOp`

will do is to combine all the results from all the partitions:

=> (45, 9)

Et voila, we will get the same result as linear one.

Now if we want to calculate the average, we could simply do 45 / 9

# Exercise

Try to figure out the result of the following process, you might need a calculator 😅:

Results: