# Parallel K-Means Clustering with reducer function

Save 37% off Functional Concurrency in .NET with code **fccterrell**.

# Fold in PLINQ: the Aggregate functions

In functional programming, a *fold* is a higher-order function, also known as *reduce and accumulate*, whose purpose is to reduce a given data structure, typically a sequence of elements, into a single value. For example, reduction could return an average value for a series of numbers, calculating a summation, maximum value, or minimum value.

The fold function takes an initial value, commonly called the *accumulator*, which is used as a container for intermediate results. Then as second argument it takes a binary expression that acts as *reduction* function, to apply against each element in the sequence to finally return the new value for the accumulator. In general, reduction works as follows. You take a binary operator — that is, a function with two arguments — and compute it over a vector or set of elements of size n, usually from left to right. Sometimes, a special seed value is used for the first the operation with the first element, since there is no previous value to use. During each step of the iteration, the binary expression takes the current element from the sequence and the accumulator value as inputs, and returns a value that overwrites the accumulator. The final result is the value of the last accumulator as shown in figure 1.

**Figure 1 The fold function reduces a sequence to a single value. The function (f), in this case, is multiplication and takes an initial accumulator with value of 1. Then, for each iteration in the sequence (5, 7, 9), the function applies the calculation to the current item and accumulator, the result is then used to update the accumulator with the nee value.**

The same concepts you may have learned about the fold function can be applied to PLINQ in both F# and C#. In fact, PLINQ has the equivalent of the fold function called Aggregate. The PLINQ Aggregate is a right-fold. For example, here is one of its overloaded signatures:

`public static TAccumulate Aggregate<TSource, TAccumulate>(`

this IEnumerable<TSource> source,TAccumulate seed,Func<TAccumulate, TSource, TAccumulate> func);

The above function takes three arguments that map to the sequence source: the initial accumulator seed and the fold function func, which updates the accumulator for each element.

The best way to understand how Aggregate works is with an example. In this example, we’ll parallelize the k-means clustering algorithm using PLINQ and the Aggregatefunction. The purpose of the example is to show how remarkably simple and performant a program becomes by using this construct. The following implementation of the k-means clustering algorithm leverages functional programming, sequence expressions with PLINQ, and some of the many built-in functions for manipulating data.

# k-means clustering

k-means, also called Lloyd’s algorithm, is an unsupervised machine-learning algorithm that categorizes a set of data points into clusters, each centered on its own centroid. A centroid of a cluster is the sum of points in it divided by the number of points. It represents the center of the mass of a geometric shape having uniform density. The k-means clustering algorithm takes an input data, and a value *k* that indicates the number of clusters to set, then places the centroids randomly in these clusters. The idea of this algorithm is to generate a number of centroids that produce the centers of the clusters. Each point in the data is linked with its nearest centroid. The distance is calculated using a simple Euclidean distance function.[1] Each centroid is then moved to the average of the positions of the points that are associated with it.

A **centroid **is computed with the sum of its points and then dividing that sum by the size of the cluster. The iteration involves these steps:

- Sum, or reduction, which computes the sum of the points in each cluster
- Divide each cluster sum by the number of points in that cluster
- Reassign, or map, the points to the cluster to the closest centroid

The process is iterative, meaning that it repeats until the algorithm reaches a final result or it exceeds the maximum number of steps. When the algorithm runs, it corrects continuously and updates the centroids in each iteration, to better cluster the input data.

For the data source used as input in the k-means clustering algorithm, we’ll use the “white wine quality” public records (figure 2), which is available online to download.

## Figure 2 The result of running the k-means algorithms using C# LINQ for the serial version of code and C# PLINQ for the parallelized version. The centroids are the red points in both clusters. Each image represents one iteration of the k-means algorithm, with 11 centroids in the cluster. Each iteration of the algorithm computes the centroid (in red) of each cluster and then assigns each point to the cluster with the closest centroid.

The full implementation of the k-means program is omitted because of the length of the code and only the relevant excerpt of the code is shown in the example in listing 1.

There are two core functions to review: GetNearestCentroid and UpdateCentroids. First, the function GetNearestCentroid is used to update the clusters. For every data input, this function finds the closest centroid assigned to the cluster to which the input belongs.

## Listing 1 Function to find the closest centroid (used to update the clusters)

double[] GetNearestCentroid(double[][] centroids, double[] center){ return centroids.Aggregate((centroid1, centroid2) => //#A Dist(center, centroid2) < Dist(center, centroid1) ? centroid2 : centroid1);

}

}

**#A Aggregate LINQ function to find the closest centroid**

The *GetNearestCentroid *implementation uses the Aggregate function to compare the distances between the centroids to find the nearest one. During this step, if the inputs in any of the clusters are not updated because a closest centroid is not found, then the algorithm is complete and returns the result.

The next step, shown in Listing 2, after the clusters are updated, is to update the centroid locations. The function *UpdateCentroids *calculates the center for each cluster and shifts the centroids to that point. Then, with the updated centroids values, the algorithm repeats the previous step running the GetNearestCentroid function until it finds the closing result. These operations continue running until a convergence condition is met, and the positions of the cluster centers become stable.

## Listing 2 Update the location of the centroids according to the center of the cluster

double[][] UpdateCentroids(double[][] centroids){ var partitioner = Partitioner.Create(data, true); //#A var result = partitioner.AsParallel() //#B .WithExecutionMode(ParallelExecutionMode.ForceParallelism) //#C .GroupBy(u => GetNearestCentroid(centroids, u)) .Select(points => points

.Aggregate(new double[N], //#D

(acc, item) => acc.Zip(item, (a, b) => a + b).ToArray()) //#E

.Select(items => items / points.Count())

.ToArray());

return result.ToArray();

}

**#A Tailored partitioner for maximizing the performance**

**#B Run the query in parallel from the partitioner**

**#C Force the parallelism**

**#D Use the aggregate function to find the center of the centroids in the cluster; the seed initial value is a double array with size N (the dimensionality of data)**

**#E Use the Zip function to thread the centroid-locations sequence and the accumulator sequence**

With the *UpdateCentroids *function, there is a great deal of processing to compute, so the usage of PLINQ can effectively parallelize the code, thereby increasing the speed.

**NOTE **Even if centroids do not move on the plane, they may change their indices in result array due to GroupBy and AsParallel nature.

The PLINQ query in the body of the function *UpdateCentroids *performs aggregation in two steps. The first step uses the *GroupBy *function, which takes as an argument a function that provides the key used for the aggregation. In this case, the key is computed by the previous function *GetNearestCentroid*. The second mapping step, which runs the Select function, calculates the centers of new clusters for each given point. This calculation is performed by the Aggregatefunction, which takes the list of points as inputs (the location coordinates of each centroid) and calculates their centers mapped to the same cluster using the local accumulator acc as shown in listing 3. The accumulator is an array of doubles with size *N*, which is the dimensionality of the data to process (*the dimensionality of data is also known as the number of characteristics/measurements*). The value *N* is defined as a constant in the upper class because it never changes and can be safely shared. The Zip function is used to thread together the nearest centroids (points) and the accumulator sequences. Then, the center of that cluster is recomputed by averaging the position of the points in the cluster. The implementation details of the algorithm are not crucial, the key point is that the description of the algorithm is translated quite precisely and directly into PLINQ using Aggregate. If you tried to re-implement the same functionality without the Aggregate function, the program would run in ugly and hard-to-understand loops with mutable shared variables. For example, listing 3 shows the equivalent of the UpdateCentroids function without the help of the Aggregate function.

## Listing 3 UpdateCentroids function implemented without Aggregate

`double[][] UpdateCentroidsWithMutableState(double[][] centroids)`

{

var result = data.AsParallel()

.GroupBy(u => GetNearestCentroid(centroids, u))

.Select(points => {

var res = new double[N];

foreach (var x in points) //#A

for (var i = 0; i < N; i++)

res[i] += x[i]; //#B

var count = points.Count();

for (var i = 0; i < N; i++)

res[i] /= count; //#B

return res;

});

return result.ToArray();

}

**#A Imperative loop to calculate the center of the centroids in the cluster**

**#B Use of the mutable state**

Figure 3 shows benchmark results of running the k-means clustering algorithm. The benchmark has been executed in a four-core machine with 8 GB of RAM. The algorithms being tested are the sequential LINQ, the parallel PLINQ, and the parallel PLINQ using a custom partitioner.

## Figure 3 Benchmark running the k-means algorithm using a quad-core machine with 8GB of RAM. The algorithms being tested are the sequential LINQ and the parallel PLINQ with a variant of tailored partitioner. The parallel PLINQ runs in 0.481 seconds, which is three times faster than the sequential LINQ version, which runs in 1.316 seconds. A slight improvement is the PLINQ with tailored partitioner that runs in 0.426 second, which is 12% faster than the original PLINQ version.

**NOTE **When multiple threads are used on a multiprocessor, more than one CPU may be used to complete a task. In this case, the CPU time may be more than the elapsed time.

The benchmark results are quite impressive. The parallel version of the k-means algorithm using PLINQ runs three times faster than the sequential version in a four-core machine. The PLINQ partitioner version, shown in listing 3, is 10% faster than the PLINQ version. There are two interesting PLINQ extensions used in the function UpdateCentroids. The WithExecutionMode(ParallelExecution Mode.ForceParallelism) extension is used to notify the TPL scheduler that the query must be performed concurrently.

The two options to configure the ParallelExecutionMode are ForceParallelism and Default. The ForceParallelism enumeration forces parallel execution. The Default value defers to the PLINQ query for the appropriate decision on execution.

In general, a PLINQ query is not absolutely guaranteed to run in parallel. The TPL scheduler can decide, based upon factors such as the size, complexity of the operations, and the current state of the available computer resources, that the query should run sequentially. However, there are cases when you’ll want to force parallelism.

The other interesting extension used in the UpdateCentroids function is the custom Partitioner:

**var partitioner = Partitioner.Create(data, true)**

The Partitioner<T> class is an abstract class that allows for both static and dynamic partitioning. The default TPL Partitioner has built-in strategies that automatically handle the partitioning, offering good performance for a wide range of data sources. The goal of the TPL Partitioner is to find the balance between having too many partitions (which introduces overhead) and having too few partitions (which under-utilizes the available resources). However, there are situations where the default partitioning may not be appropriate, and you can gain better performance from a PLINQ query by using a tailored partitioning strategy.

In the example, the custom partitioner is created simply by using an overloaded version of the Partitioner.Create method, which takes as an argument the data source and a flag indicating which strategy to use, either dynamic or static. When the flag is true, the partitioner strategy is dynamic, and static otherwise. A static partitioning often provides speedup on a multicore computer with a small number of cores (two or four). A dynamic partitioning aims to load balance the work between tasks by assigning an arbitrary size of chunks and then incrementally expanding the length, after each iteration. It is possible to build very sophisticated partitioners with complex strategies.

# Understanding how partition works:

In PLINQ, there are four kinds of partitioning algorithms:

**Range partitioning**works with a data source with a well-known size. Arrays are part of this category:

`int[] data = Enumerable.Range(0, 1000).ToArray();`

data.AsParallel().Select(n => Compute(n));

**Stripped partitioning**is the opposite of Range. The data source size is not predefined, thus the PLINQ query fetches one item at a time and assigns it to a task until the data source becomes empty. The main benefit of this strategy is that the load can be balanced between tasks.

`IEnumerable<int> data = Enumerable.Range(0, 1000);`

Data.AsParallel().Select(n => Compute(n));

**Hash partitioning**uses the values hash code to assign elements with the same hash code to the same task (for example, when a PLINQ query performs a GroubBy).**Chunk partitioning**works with incremental chunk size, where each task fetches a chunk of items from the data source, whose length expands with the number of iterations. Thus, with each iteration, there are larger chunks to keep the task busy as much as possible.

# Implementing a parallel Reduce function for PLINQ

Now you have learned about the power of aggregate operations, which are particularly suited to scalable parallelization on multicore hardware due to low memory consumption and deforesting optimization. The low memory bandwidth is due to the fact that aggregate functions produce less data than they ingest. For example, other aggregate functions like Sum() and Average() reduce a collection of items to a single value. That is the concept of reduction: it takes a function to reduce a sequence of elements to single value. The PLINQ list extensions do not have a specific function Reduce, like in F# list comprehension or other functional programming languages like Scala and Elixir. However, after having gained familiarity with the Aggregate function, the implementation of a reusable Reduce function is an easy job. Listing 4 shows an implementation of a Reduce function.

## Listing 4 A parallel Reduce function implementation using Aggregate

static TSource Reduce<TSource>(this ParallelQuery<TSource> source, Func<TSource, TSource, TSource> func)

{

return ParallelEnumerable.Aggregate(source, //#A

(item1, item2) => func(item1, item2)); //#B

}int[] source = Enumerable.Range(0, 100000).ToArray();

int result = source.AsParallel()

.Reduce((value1, value2) => value1 + value2);//#C

**#A Use the Aggregate function to develop a custom Reduce**

**#B For each iteration, the function func is applied to the current item and the previous value is used as an accumulator**

**#C Use the Reduce function passing an anonymous lambda to apply as a reducing function**

The Reduce function takes two arguments: the sequence to reduce and the function to apply for the reduction. The underlying implementation uses Aggregate, treating the first item from the source sequence as accumulator. The rest of the code is an example to apply the Reduce function to calculate the Sum of a given sequence in parallel.

Associativity and Commutativity for deterministic Aggregation

The order of computation of an aggregation that runs in parallel using PLINQ (or PSeq) applies the Reduction function differently than the sequential version.

In the previous k-means example, the sequential result is computed in a different order than the parallel result, but the two outputs are guaranteed to be equal. This is because the operator + (plus), used to update the centroid distances, has the special properties of associativity and commutativity. This is the line of code used to find the nearest centroid:

**Dist(center, centroid2) < Dist(center, centroid1)**

This is the line of code used to find updates to the centroids:

In functional programming, the mathematical operators are functions. The + (plus) is a binary operator, which means that it performs on two values and manipulates them to return a result.

`points`

.Aggregate(new double[N],

(acc, item) => acc.Zip(item, (a, b) => a + b).ToArray())

.Select(items => items / points.Count())

A function is *associative* when the order in which it is applied does not change the result. This property is important for **reduction operations**. The + (plus) operator and the * (multiply) operator are associative because:

(a + b) + c = a + (b + c)

(a * b) * c = a * (b * c)

A function is *commutative* when the order of the operands does not change its output, so long as each number is accounted for. This property is important for **combiner operations.** The + (plus) operator and the * (multiply) operator are commutative because:

a + b + c = b + c + a

a * b * c = b * c * a

## Why does this matter?

This matters because, using these properties, it is possible to partition the data and have multiple threads operating independently on their own chunks, achieving parallelism, and still return the correct result at the end.

The combination of these properties permits the implementation of parallel patterns like divide-and-conquer, Fork/Join, and MapReduce.

In order for a parallel aggregation in PLINQ/PSeq to work correctly, the applied operation must be both associative and commutative. The good news is that many of the most popular kinds of reduction functions *are* both associative and commutative.

Hopefully you found this article informative. For more information on the intersection of concurrency and the functional programming paradigm, download the free first chapter of Functional Concurrency in .NET and see this Slideshare presentation for more details.