ML Data Retrieval Using Ray

juniper ai/ml team
juniper-team
Published in
4 min readJan 25, 2021

Authors: Subhabrata Banerjee, Pooja Ayanile, Divyank Garg, Ajit Patankar, Sabyasachi Mukhopadhyay

Introduction

In a previous blog about Initiating Ray clusters we created a cluster of 5 VMs for developing a parallel distributed application for AI/ML. Legacy Python classes and functions can then be converted to Ray enabled code to run on the distributed cluster. Apart from this, Ray provides different AI/ML libraries on top of common defined libraries to improve performance and model accuracy. In this blog post, first we illustrate the implementation of Ray actors and remote functions using a general example, and then a specific example for ML data collection.

Ray Core Walkthrough

The Python API for Ray provides two main constructs: actors and workers. When a new actor is instantiated, a new worker is created, and methods of the actor are scheduled on that specific worker and can access and mutate the state of that worker.

Below a simple python code snippet example shows a part of Ticket Management System to get a total number of inquiries to answer.

Python Function Implementation without ray.remote()

class TotalOpenInqueries(object):def __init__(self):self.value = 0def increment(self):self.value += 1return self.valuedef get_openInqueries(self):return self.valueTotalOpenInqueries_object = TotalOpenInqueries()

It can be converted to ray actor by decorating the python class with ray.remote() function.

Python Function Implementation with ray.remote

@ray.remoteclass TotalOpenInqueries(object):** Repeating the above Code **TotalOpenInqueries_actor = TotalOpenInqueries.remote()

When the above actor is instantiated, the following events happen:

  1. A node in the cluster is chosen and a worker process is created on that node for the purpose of running methods called on the actor.

2. A TotalOpenInqueries object is created on that worker and the TotalOpenInqueries constructor is run.

Methods of the actor can be called remotely. Below, we have implemented a decrement method for our Inquiry System.

Python Function Implementation with ray.remote

TotalOpenInqueries_actor = TotalOpenInqueries.remote()@ray.remoteclass CloseInquery(object):# Method@ray.method()def decrement(self):self.value = self.value — 1return self.valuef = CloseInquery.remote()obj_ref = f.decrement.remote()

We can specify that an actor requires CPUs or GPUs in the decorator. While Ray has built-in support for CPUs and GPUs, Ray can also handle custom resources.

Below, we are restricting our actor to use 2 CPUs and 1 GPU.

Python Function Implementation with ray.remote

@ray.remote(num_cpus=2, num_gpus=1)TotalOpenInqueries_actor = TotalOpenInqueries.remote()

Ray Legacy code for Data Gathering

As mentioned above Ray remote function and actors can be used for fetching the data in parallel manner.

If we know that the majority of data gathering tasks are repeatable in the sense that we can divide the whole big task into small chunks, each will have the same definition but produce their own output and later we can collect all smaller outputs and combine them into one big output. That’s why we can take advantage of parallelism of Ray processes in Data Gathering tasks.

Architecture of Wikipedia Summary Fetching System using RAY is depicted in Fig-1, where a parallel ray process fetches the data from wikipedia. This step is followed by another parallel process that merges the output of the previous step.

Fig-1 : Architecture of Wikipedia Data Retrieval System using RAY

In the above example, a batch size of 256 was selected by tuning. It means if we have four threads (Actors instantiated using the Ray remote function), all actors will work concurrently and fetch 256 articles each. In total, we will have 1024 articles summary in ¼ of total time taken by the sequential approach.

Since each actor returns a dataframe, these need to be combined to get the final results. Ray parallelism can be applied for this task as well. A hierarchical approach is followed where each Ray actor is responsible for combining two dataframes. As all processing is performed simultaneously, this results in log(n) time complexity for the merge process.

Python Function Implementation with ray.remote for fetching summary of Wikipedia articles using Wiki API

@ray.remotedef fetch_from_wikipedia(title_list):# it takes title list as input# returns the df with downloaded article summary for each titleresult=[]for k in title_list:try:summary = wikipedia.summary(k)except Exception:summary = np.NaNresult.append(summary)df = pd.DataFrame()df[“title”] = title_listdf[“article”] = resultreturn df

Results

The table 1 shows the Comparison between time spent on fetching 2048 summaries of Wikipedia Articles using Traditional Serial Python approach and Distributed Parallel RAY approach.

Table-1 : Comparison between the Traditional Serial Python approach and Distributed Parallel RAY approach

In this simple yet illustrative example, we are getting approximately 30x performance improvement which corresponds to the number of cores in the Ray cluster.

Conclusions

In this blog, we have illustrated how to parallelize data retrieval tasks using Ray. This system can be scaled to leverage the full computational power of a cluster of servers. In the next blog, we will use the same methodology for the Model selection stage of ML pipeline, where we will train and test different models parallel using the Ray process.

Previous Blog: https://medium.com/juniper-team/tips-on-installing-and-maintaining-ray-cluster-b5535743f97c

--

--