Ray: Distributed Computing Framework for AI/ML applications

juniper ai/ml team
juniper-team
Published in
6 min readJan 12, 2021

Author: Ajit Patankar, Sabyasachi Mukhopadhyay, Subhabrata Banerjee, Pooja Ayanile, Divyank Garg

Ray is a distributed computing framework primarily designed for AI/ML applications. Ray originated with the RISE Lab at UC Berkeley. We have extensively used Ray in our AI/ML development process and summarize our evaluation in this series of blog posts. Our team develops complex AI/ML models primarily on large telemetry and NLP data sources. We started evaluating new generation of distributed computing tools in response to the following issues:

  • Inability to support data and model pipelines on a single machine.
  • Frequent necessity of developing custom infrastructure tools

Amongst the many competing frameworks, such as Ray, Dask, Spark, Modin, and Horovid, we selected Ray for detailed review and analysis for the following reasons:

  • Strong open source community.
  • Broad range of functionality including extensions such as Ray Tune
  • Reputation of researchers in the RISE laboratory.

Ray Overview

Ray project site can be accessed at this link. The original architecture paper by Robert Nishihara, Phillip Moritz, and others is also an excellent reference and explains many of the architectural decisions. As explained in this paper, Ray architecture is as shown in Figure 1

Figure 1: Ray Internal Architecture (source)

As figure 1 shows, the architecture is divided in two layers: Application and System or Global Control. The salient points of each layer are as follows:

Application layer:

  • Driver: executes user programs. User programs can embed workers or actors.
  • Worker: stateless process that executes tasks (remote functions), serial execution
  • Actor: stateful process with previous execution determining the state input
  • Workers are started automatically while actors need to be explicitly instantiated

System/Global Control layer:

  • Maintains the entire control state of the system called global control state (GCS)
  • Sharding to achieve scale and per shard chain replication
  • According to the authors Object Metadata as a part of GCS (Vs part of Global Scheduler) is a better approach
  • Communication and latency-sensitive applications
  • Decouples task dispatch from task scheduling

The Ray deployment (cluster) Architecture

The Ray deployment or cluster architecture is shown in Figure 2.

Figure 2: A Ray cluster (source)

A Ray cluster comprises a set of worker nodes and a centralized Global Control Store (GCS) instance.

Components

A Ray instance consists of one or more worker nodes, each of which consists of the following physical processes:

  1. One or more worker processes, responsible for task submission and execution. A worker process is either stateless (can execute any @ray.remote function) or an actor (can only execute methods according to its @ray.remote class). Each worker process is associated with a specific job. The default number of initial workers is equal to the number of CPUs on the machine. Each worker stores:
  2. An ownership table. System metadata for the objects to which the worker has a reference, e.g., to store ref counts.
  3. An in-process store, used to store small objects.
  4. A raylet. The raylet is shared among all jobs on the same cluster. The raylet has two main threads:
  5. A scheduler. Responsible for resource management and fulfilling task arguments that are stored in the distributed object store. The individual schedulers in a cluster comprise the Ray distributed scheduler.
  6. A shared-memory object store (also known as the Plasma Object Store). Responsible for storing and transferring large objects. The individual object stores in a cluster comprise the Ray distributed object store.

Each worker process and raylet is assigned a unique 20-byte identifier and an IP address and port. The same address and port can be reused by subsequent components (e.g., if a previous worker process dies), but the unique IDs are never reused (i.e., they are tombstoned upon process death).

One of the worker nodes is designated as the head node. In addition to the above processes, the head node also hosts:

  1. The Global Control Store (GCS). The GCS is a key-value server that contains system-level metadata, such as the locations of objects and actors. There is an ongoing effort to support high availability for the GCS, so that it may run on any and multiple nodes, instead of a designated head node.
  2. The driver process(es). A driver is a special worker process that executes the top-level application (e.g., `__main__` in Python). It can submit tasks, but cannot execute any itself. Driver processes can run on any node, but by default are located on the head node when running with the Ray autoscaler.

Related systems

The following table compares Ray to several related system categories.

Cluster Orchestrators

Ray can run on top of cluster orchestrators like Kubernetes or SLURM to offer lighter weight, language integrated primitives, i.e., tasks and actors instead of containers and services.

Parallelization Frameworks

Dask:Dask is a parallel processing library with a special focus on data science. Dask is designed to mimic the APIs of Pandas, Scikit-Learn, and Numpy, making it easy for developers to scale their data science applications from a single computer on up to a full cluster.

Celery:Celery is a distributed, asynchronous task queue. Celery allows tasks to be completed concurrently, either asynchronously or synchronously. While Celery is written in Python, the protocol can be used in other languages. Celery is used in some of the most data-intensive applications, including Instagram. As such, Celery is extremely powerful but also can be difficult to learn.

Data Processing Frameworks

Spark: Spark is more focused at large scale data processing, while Ray is focused on machine learning applications in particular, and this has a little bit of different systems-level requirements(machine learning applications might spawn many tasks, so there is a big requirement for running many tasks quickly, and high throughput for executing and scheduling these tasks). Also,compared to Spark, Ray is more focused on keeping the simple API and making it accessible to Python users

Dask: Modin on Ray is a simple change for migration from a Pandas framework, and API differences in the DataFrame are even smaller than with Dask.

Actor Frameworks

Unlike specialized actor frameworks such as Erlang and Akka, Ray integrates with existing programming languages, enabling cross language operation and the use of language native libraries. The Ray system also transparently manages parallelism of stateless computation and explicitly supports memory sharing between actors.

Table 1: Ray related systems (source: Ray documentation and authors analysis)

A very useful benchmarking comparison of Spark, Dask and Ray is given by Antti Puurula. This work is primarily in the data pipeline area while our work is in model training and inference areas.

Ray Ecosystem

A thriving ecosystem has evolved around Ray. In particular, we also evaluate the following add ons to Ray:

  • Ray Tune. Ray Tune implements hyperparameter tuning which is a very important aspect of the model training process. Ray tune helps in two ways:
  • Utilizing a distributed computing environment such as multiple cores or machines which naturally speeds up the tuning process.
  • Built-in advanced optimization algorithms for the tuning process.
  • Ray Serve. Ray serve is a scalable model serving library built on Ray. Similar to advanced tuning algorithms in Ray Tune, Ray Serve also provides additional functionality such as splitting traffic between backends (useful for A/B tests), batching, and resource management.
  • Ray SGD (distributed tensorflow using Ray). RaySGD is a library for distributed deep learning that wraps PyTorch or TensorFlow native modules.

Use Cases for Evaluation

The productivity of our data science team is impacted due to the following factors:

  • Very long duration of computations during training process
  • Need to evaluate multiple modeling techniques
  • Hyper parameter tuning
  • Reliably generating large numbers (> millions) inferences in production (model ‘serving’)

The above context has been used to design the following use cases:

  • Integrating Ray with legacy code
  • Ray for model section
  • Integrating Ray with scikit-learn
  • Hyperparameter tuning using Ray (Ray Tune)
  • Generating inferences in production (Ray Serve)

In subsequent blog posts, we will be evaluating Ray by implementing the above use cases. The evaluation will consist of ease of implementation and gains in performance and scalability.

We also observed that installing and maintaining a Ray cluster requires considerable efforts and systems experience. Thus our blog series will start with a short ‘cheat-sheet’ to help install and debug Ray clusters.

Some of the infrastructure and tools issues that we plan on addressing in future include:

  • Comparison of Ray with existing tools and in particular with Apache Spark.
  • Using Ray with infrastructure software like Kubernetes.

Conclusions

Ray is a very promising technology for distributed AI/ML training and production. It is based on a modern, clean architecture and not hampered by legacy code. In this series of blog posts, we rigorously evaluate Ray by implementing a broad range of use cases. This evaluation will assist practitioners in the selection and implementation of a suitable distributed computing environment for AI/ML applications.

Next series blog reference: https://medium.com/juniper-team/tips-on-installing-and-maintaining-ray-cluster-b5535743f97c

--

--