Ray-supported high-performance distributed clustering algorithm

Evan
NavePnow
Published in
9 min readMay 23, 2020

Build a whole MapReduce on top of Ray and implement K-means based on that.

Photo by Alex Motoc on Unsplash

Abstract

Why Ray + clustering ?

Due to the growth of AI applications in recent years, more people have adopted distribute systems for model training and learning. In order to meet the demanding system requirements, Ray was proposed as a new distributed computing framework at the end of 2017. With two years’ development, although many high-level machine learning libraries built by Ray have been proposed, for traditional AI applications-clustering, Ray does not have high API support and related optimization.

In order to improve Ray’s support for clustering libraries, a design scheme for advanced clustering algorithms based on Ray is proposed. By using the API provided by Ray, a complete MapReduce parallel computing framework is established. Meanwhile, based on the basic algorithm ideas of K-means, related optimization algorithms about K-means, like K-means ++ and Elkan K-means are implemented in this framework. Finally, the optimized MapReduce framework on top of Ray is built. Compared with the traditional K-means clustering, the results show that the Ray-supported MapReduce framework has obvious advantages in improving clustering efficiency and scalability, which can get a high-performance distributed clustering algorithm.

Study of Ray

Programming Model

  1. Task

In general, Task is the special function of Ray version. When program calls function, Ray will assign it to another machine to execute remotely. Thus, the execution of the remote function is called Task. When a remote function is invoked, it will return its result immediately, which is call future even though the execution is not completed. The future, in this scenario, contains one or a list of Object id of its results. The remote function in the Ray is as stateless as a traditional function, which just processes the input and returns the result without no intermediate state.

  1. Actor

Generally, the Actor is the special class of Ray version. Compared with class in Python, when an Actor is initialized, Ray will see it as a separate process and execute it remotely. All the methods in the Actor will be treated as Task. However, the most different part between these methods in an Actor and Task is that they execute on a stateful Worker, which includes global states and can be modified by various tasks within Ray.

System Design

Dataset

To make a demo, I use Gowalla as the dataset, which is a location-based social networking website where users share their locations by checking-in. This dataset includes user, check-in time, latitude, longitude, and location id to represent all information. I only use latitude and longitude to make clustering because these two factors can be shown on the figure easily.

Framework

Figure 1 mapreduce-ray

At the beginning, I filter out non-location data of Gowalla check-in dataset and calculate the centroid vectors given clusters number k. Two methods can be utilized for generating centroid vectors. The first one is selecting k points randomly from dataset as initial centroid vectors. And the another one is K-means++ algorithm, which is discussed in the Algorithm Used part.

After getting initial centroid vectors, the dataset will be split into many mini-batches. In the traditional MapReduce, each batch will flow into the mapper operator to do the calculation. In this part, each mini-batch will invoke an Actor in the Ray system as a mapper operator. And then I use a broadcast function to send the initial centroid vector to all actors. Given all requirements, each actor will process the data and generate the clusterAssment which includes index (which cluster each point belongs to) and distance between the point and nearest centroid vector. In the Ray system, the execution between Actors is separate, so this mapper function can be highly efficient executed in parallel.

As for each Actor, it has a remote function called assign_cluster to deal with each point in the mini-batch. In general, given k clusters and n points, we should calculate the distance kxn times sequentially to assign each point to its nearest centroid vector. This process is time-consuming, and many advanced algorithms are proposed to optimize this process, which is discussed in the Algorithm Used part. As we all know, each remote function is treated as a separate process and will be executed on one worker. The issue here is that every task invocation has a non-trivial overhead (e.g., scheduling, inter-process communication, updating the system state) and this overhead dominates the actual time it takes to execute the task. Therefore, as it is a sequential program, it is necessary to implement some different data processes to make the program faster.

Figure 2 calculation in parallel

In Fig. 2, I implement a parallel calculation to amortize the remote invocation overhead. For each Mapper (Actor based), I split the dataset into n much smaller set. And then I invoke n workers (separate processes) to deal with each small dataset. All I want to do is that although invoking workers can cost a bit, different workers can do distance calculations in parallel. Therefore, sequential work can be cut off into many parts and these parts will be executed in parallel. With this special treatment, the performance can be improved and the whole pipeline is more suitable for more machines to finish the distributed clustering algorithm.

Back to the main pipeline, after the mapper operators, we get all list of clusterAssment including every pair (index and distance) and information (latitude and longitude) of each point. In the reducer operator, we initiate as many actors as the clustering result. For example, if we generate 5 clusters, 5 actors will be invoked for reducers. For the reducer 1, it will get the point data from shuffle whose index is 1. The rest can be done in the same manner. Finally, all reducers get related points and calculate the mean value of coordination (latitude and longitude) to generate a new center point.

With all reducers finishing calculation in parallel, we gain k new centroid vectors. In order to make a more precise result and finish iteration, we should compare the new one with old centroid vectors by some loss function and update the old one. With the new centroid vectors, we process the data again in a new iteration until we get the final result.

Algorithm Used

To make further optimization, I utilize some advanced algorithms to improve the performance of a distributed clustering algorithm.

  1. K-means

The K-Means algorithm is an unsupervised clustering algorithm. It is relatively simple to implement and has a good clustering result, so it is widely used for large and continuous data.

2. K-means++

Although the traditional method, choosing k centroid vectors randomly, seems like fast and high efficiency, it won’t perform well on convergency at a large dataset. Actually, with random choice, it will spend a lot of time on finding good centroid vectors. K-means++ offers one smart strategy when choosing k centroid vectors so that it will have a high efficiency in finding good results.

3. Elkan K-means

In the traditional K-means algorithm, we need to calculate the distance from all sample points to all centroids in each iteration, which will be time-consuming. The Elkan K-means uses the triangle property where the sum of the two sides is greater than the third side to reduce the calculation of the distance.

Testing

Environment

Processor 2.4 GHz 8-CoreIntel Core i9

Memory 64 GB 2667 MHz DDR4

Result

  1. Optimization — Time

In this section, I set the number of Mappers to 5 and the number of Tasks to 2, while using the Elkan-Kmeans optimization algorithm. To make a comparison, I implemented K-Means using only the simple Ray API to highlight the effectiveness of the optimization effort. The results are shown in Fig. 3.

Figure 3 Optimization — Time

The use of the MapReduce parallel framework implemented by Ray and Elkan-Kmeans algorithm can have a huge improvement in the overall clustering efficiency. This is because the framework accelerates a linear, sequentially executed computational process through data slicing, parallel computing, etc. Secondly, it can be seen in the figure that as the data set grows larger, the efficiency of the traditional method decreases more significantly. Therefore, with many optimizations, this system can be deployed in the large cluster for large scale dataset clustering with high performance.

2. Size of Dataset, Mapper — Time

In this part, I set different scales of the dataset and numbers of mappers to see which combination of these two factors can make the highest efficient clustering. The result can be shown in Fig. 4.

Figure 4 Size of Dataset, Mapper — Time

In Fig. 4, different colors represent a different number of mappers used in the system. As we can see, if the mapper is 1 (no split data), the cost will increase linearly. With the number of mappers increase, the efficiency will be improved. However, as we can see in the chart, too many mappers will invoke excessive actors and tasks to deal with plenty of data. With the limitation of several workers can be invoked concurrently and the size of the object store, the efficiency will be decreased. Different machines have different configurations. Therefore, people should take many experiments to find the best number of mappers to reach high performance.

3. Number of Workers, Size of Dataset — Time

In this part, I set different scales of the dataset and numbers of tasks shown in Fig. 5–2 to see whether the efficiency will be improved with the increasing numbers of workers. The result is shown in Fig. 5.

Figure 5 Number of Workers, Size of Dataset — Time

In Fig. 5, different colors represent a different scale of the dataset used in the system, whose units are 10k. From 1 task to 2 tasks, the efficiency increases sharply. This is because 2 tasks can share the computation of 1 task. However, with more tasks joining in, the performance increases slightly and even decreases. As shown in my computer, the Ray system cannot create too many workers at the same time. Therefore, there is a queue with not finished tasks. Each task finds the idle workers to execute. All this action will shrink the efficiency of the Ray system, which leads to no good effects.

Conclusion

With sample wrappers and API of Ray, anyone can build their own distributed system, especially for machine learning. The initial intent of Ray is for Reinforcement learning, building a distributed training framework. With Ray’s growth, many people are wondering whether Ray can take Spark’s space, becoming a brand-new distributed computing framework for general purposes. In my report, I try one specific example — a distributed clustering algorithm to see the performance of Ray. With several optimizations and distributed strategies utilized, Ray can easily handle this question. However, Ray does not perform well in efficiency and has many aspects to optimize, like the execution of task, workers upper bound, and object store upper bound. Therefore, Ray is more like a key, opening the door to distributed applications for users. We all know that the future must be a distributed world, so this trend will never change. I firmly believe that Ray will become stronger and more robust with the maintenance of developers and users from all over the world.

Reference

1. Lloyd, Stuart P. (1957). “Least square quantization in PCM”. IEEE Transactions on Information Theory, VOL. IT-28, NO. 2, March 1982, pp. 129–137.

2. Arthur, D.; Vassilvitskii, S. (2007). “k-means++: the advantages of careful seeding”. Proceedings of the eighteenth annual ACM-SIAM symposium on Discrete algorithms. Society for Industrial and Applied Mathematics Philadelphia, PA, USA. pp. 1027–1035.

3. B. Bahmani, B. Moseley, A. Vattani, R. Kumar, S. Vassilvitskii “Scalable K-means++” 2012 Proceedings of the VLDB Endowment.

4. Elkan, Charles (2003). “Using the triangle inequality to accelerate kmeans” (PDF). Proceedings of the Twentieth International Conference on Machine Learning (ICML).

5. “MapReduce Tutorial”. Apache Hadoop. Retrieved 3 July 2019.

6. Marozzo, F.; Talia, D.; Trunfio, P. (2012). “P2P-MapReduce: Parallel data processing in dynamic Cloud environments” (PDF). Journal of Computer and System Sciences. 78 (5): 13821402.

7. “Example: Count word occurrences”. Google Research. Retrieved September 18, 2013.

8. Berlińska, Joanna; Drozdowski, Maciej (2010–12–01). “Scheduling divisible MapReduce computations”. Journal of Parallel and Distributed Computing. 71 (3): 450–459.

9. Philipp Moritz et al. 2018. Ray: A Distributed Framework for Emerging AI Applications. In 13th USENIX Symposium on OSDI ’18. 561–577.

10. M. Zaharia, M. Chowdhury, M. J. Franklin, S. Shenker, and I. Stoica. Spark: cluster computing with working sets. In Proceedings of the 2nd USENIX conference on Hot topics in cloud computing, HotCloud’10, pages 10–10, Berkeley, CA, USA, 2010. USENIX Association.

--

--