Accelerating object storage processing for Ray framework

Gil Vernik
CodeFlare
Published in
6 min readJul 15, 2021

Ray is a distributed execution framework that makes it easy to scale computing applications. Originally created at UC Berkley, Ray was designed to address deep learning and machine learning workloads, while providing a “serverless” user experience. Ray clusters can be started locally, over virtual machines, Kubernetes, or on cloud platforms. For example, you can have Ray over the IBM Cloud Code Engine.

A growing ecosystem of libraries on top of Ray allows users and developers to use SQL or data frames over data that is stored in object storage (e.g., IBM Cloud Object Storage, Amazon S3, etc.) For example, Modin, Dask, and even Apache Spark now support integration with Ray and their data pre-processing capabilities can be exploited before the core Ray computations. However, in certain cases, users may have proprietary code, packages, or software that doesn’t naturally fit into Dask, Modin or Apache Spark execution model. As more workloads start to leverage and benefit from Ray, those requirements need to be addressed as well. In this blog, we discuss the challenges in applying Ray computation on object storage data and demonstrate how the Lithops framework can used within a Ray application to simplify data handling. All the code used in this blog and additional documentation can be found under the CodeFlare project.

Ray and object storage­

Let’s start with an example of images stored in the IBM Cloud Object Storage and assume we want to extract the colors of each image, along with its dominant color range. Having the colors will enable us to run various deep learning algorithms, like the one described in the “Movie recommendation system” keynote talk at the Ray Summit 2020.

We initially write ­­­a helper function that reads a single image from the object storage and extracts its colors.

Next, we write a Ray task that, once invoked, will invoke extract colors function, identify the dominant color range, and return the results.

Now let’s implement the main method that lists the images in the bucket and submit our Ray task for a single image.

Deep dive into the example

While the code above is simple enough, it demonstrates some challenges for the developer. First of all, the developer needs to be familiar with object storage semantics. For example, we need to use paginator to list objects in a bucket, since a single response is usually limited to 1000 entries. Moreover, each Ray task will initiate an object storage connection and perform authentication. Thus, if there are 100K images, there will be 100K tasks: each will set up a new object storage connection and perform the authentication. In such a scenario, a cloud vendor is more likely to throttle (ignore) requests due to a large number of concurrent authentication requests.

To address the authentication issue, developers can potentially implement a Ray actor and share a token between tasks. But, this involves much more effort when it comes to writing boiler plate code. The “boiler plate” code will have to list and read objects, and will grow even larger if data is stored in other storage locations, like Amazon S3, Azure, Google, CEPH, etc. This essentially forces users to be aware of the different storage backend implementations and APIs, and write more code.

Clearly, an efficient method for object storage access requires significant effort by the developer. This is exactly where the Lithops framework can be leveraged within Ray to significantly simplify data access and handling for Ray applications.

Data adapters for Ray

Lithops framework is designed to address two main challenges: deploy and execute user code on a massive scale against any serverless backend, and enable user applications to process large volumes of data, especially when data is persisted in the object storage. As part of CodeFlare effort, we focus on enabling and leveraging Lithops' data loading and processing features to simplify data handling within a Ray context. Lithops has been recently extended to provide native support for Ray, where Lithops uses a Ray cluster as its compute backend. It currently supports IBM Cloud Object Storage, Amazon S3, Google Cloud Storage, Azure Blob Storage, Ceph, and others.

Namibia

Leveraging Lithops within Ray

Now let’s see how Lithops can help in our example. We need to modify the extract_color function by removing all the object storage semantics. Instead of using boto3 directly, we let Lithops inject a reference to the data object, hiding all boto3 object storage semantics from the user.

At runtime, Lithops will fill in the value of the `obj` with a reference to the actual data. This reference has two properties: ‘obj.key’ is the data object name and ‘obj.data_stream’ is a data stream to the actual data in the object storage bucket. More information on the ‘obj’ can be found here.

We make a small modification to the Ray task. Instead of a direct call to extract_color, Lithops is being used behind the scenes (through the data object) to call it only at the right moment.

Let’s tie it all together with a main method.

By enabling Lithops within Ray, we are able to remove all the boiler plate code required to list data from the object storage. It also inspects the data source by using the internal Lithops data partitioner and creates a lazy execution plan, where each entry maps an “extract_color” function to a single image. Moreover, Lithops creates a single authentication token that is used by all the tasks, instead of letting each task perform authentication. The parallelism is controlled by Ray and once Ray task is executed, it will call Lithops to execute the extract_color function directly in the context of the calling task. Thus, by using Lithops, we can allow code to access object storage data, without requiring additional coding effort from the user.

Summary

By enabling Lithops within Ray, we just saw how we can hide all storage semantics and enable Ray users to focus on the business logic, rather than on the boiler plate code required to access underlying storage. Since Lithops supports various storage backends, a simple URI scheme change will allow it to access other storage vendors, such as Amazon S3, which uses “s3://”, and so on. Here you can see the supported storage backends and how to configure them. Lithops also supports the chunking of data that resides in big CSV files without breaking lines in the middle. This enables Ray tasks to process chunks of data, instead of loading an entire CSV file into a single task or actor.

Next steps

We plan to publish more blogs and tutorials showing how Lithops can use Ray as its backend compute engine. We will also explain how to use Ray actors, how to reduce the number of Ray tasks, and even demonstrate how Lithops can be leveraged to reduce overload on the entire Ray cluster, while helping to enable a hybrid cloud experience for Ray use cases.

Stay tuned!

--

--

Gil Vernik
CodeFlare

I am a hands-on senior architect, technical and a team leader IBM Research. I am an expert in Big Data analytic engines and serverless computing