(Refinements, February 8, 2020)
TL;DR Ray is a system for scaling Python applications across compute clusters with minimal effort. This post explains the problems Ray solves and how to use it.
What is Ray? What problems does it solve?
Ray (website, GitHub) is an open-source system for scaling Python applications from single machines to large clusters. Its design is driven by the unique needs of next-generation ML/AI systems, which face several unique challenges, including diverse computational patterns, management of distributed, evolving state, and the desire to address all those needs with minimal programming effort.
Typical ML/AI systems require diverse computational patterns to support data cleaning and preparation, hyperparameter tuning, model training and serving, and other chores. The original MapReduce model for big data workloads works well for data cleaning, preparation, and also for analysis workloads, but machine learning workloads require a mixture of fine-grained to coarse-grained tasks, along with varied communication patterns between components. Hyperparameter tuning and model training are very compute-intensive, requiring cluster resources to complete in reasonable time frames. Ray provides the foundation for building modern ML/AI systems and applications by satisfying these diverse requirements in a performant manner, with a minimal and intuitive API.
A second challenge is distributed, evolving state. In the context of ML/AI, distributed state includes the hyperparameters, model parameters (e.g., and for reinforcement learning scenarios, the state of simulations (or interactions with the real world) used for training. Often the state is mutable, especially during training, so careful, concurrency-safe updates are required. One possible way of handling distributed computing is to exploit popular “serverless” systems, but none currently offers facilities for managing distributed, mutable state. Developers must resort to keeping all state in a database when using serveless systems, but the database can be a bottleneck and a single point of failure.
Instead, Ray uses the popular Actor model to provide an intuitive mechanism for state management. Ray actors provide a stateful complement to Ray tasks, which are stateless. This state is transparently reachable to any other Ray actor or task through a reference to the corresponding Python object (i.e., an instance of a Python class). Ray keeps track of where the actor is located in the cluster, eliminating the need to explicitly know and manage such locations in user code. Mutation of state in the actor is handled in a thread-safe way, without the need for explicit concurrency primitives. Hence, Ray provides intuitive, distributed state management for applications, which means that Ray can be an excellent platform for implementing stateful serverless applications, in general. Furthermore, when communicating between actors or tasks on the same machine, the state is transparently managed through shared memory, with zero-copy serialization between the actors and tasks, for optimal performance.
Finally, because most ML/AI systems are Python-based, developers need a way to add these scale-out capabilities with minimal code changes. A decorator,
@ray.remote, marks functions and classes as logical units that can be instantiated and executed in a cluster. Ray transparently handles thread-safe mutation of state, distribution of state, and intuitive scheduling of dependent tasks.
The Ray distribution includes several high-performance libraries targeting AI applications, which were also motivating problems that drove the creation of Ray. They include RLlib for reinforcement learning and Tune for hyperparameter tuning. Both demonstrate Ray’s unique capabilities. These libraries and other, custom applications written with Ray are already used in many production deployments.
Ray is an open source project started in the UC Berkeley RISELab. It is now developed at Anyscale with major contributions from many other organizations. Commercial users of Ray include Ant Financial, JP Morgan, Intel, Microsoft, Ericsson, Skymind, Primer, and many others.
An Example of the Core Ray API
Note: The full listing of the following code example can be found at the end of this post.
Now that we understand the motivations and advantages of Ray, let’s examine how you would use the Ray API in your applications. Then we’ll look more closely at how Ray improves performance through parallelization and distribution. The Ray API is carefully designed to enable users to scale their applications, even across a cluster, with minimal code changes.
Consider the example of a parameter server, which is a key-value store used for training machine learning models in a cluster. The values are the parameters of a machine-learning model (e.g., a neural network). The keys index the model parameters. If you are unfamiliar with parameter servers, think of any standalone service you might need for serving requests for information or data.
For example, in a movie recommendation system, there might be one key per user and one key per movie. For each user and movie, there are corresponding user-specific and movie-specific parameters. In a language-modeling application, words might be the keys and their embeddings may be the values.
In its simplest form, a parameter server may have a single key and allow all of the parameters to be retrieved and updated at once.
Here is an example of such a simple parameter server, for a single NumPy array of parameters. It is implemented as a Ray actor in under 15 lines of code:
@ray.remote decorator defines a service. It takes the ordinary Python class,
ParameterServer, and allows it to be instantiated as a remote service. Because the instance maintains state (the current parameters, which are mutable), we have a stateful service.
In this example, we assume that an update to the parameters is provided as a gradient that should be added to the current parameter vector. (This gradient can be a single number that is added to all array elements or an array of gradients.) More sophisticated designs are possible, of course, but Ray would be used the same way. As a simple exercise, try changing this to a key-value (dictionary) implementation.
A parameter server typically exists as a remote process or service. Clients interact with it through remote procedure calls. To instantiate the parameter server as a remote actor, we do the following steps at the interactive Python prompt. (We’ll assume you already defined the
ParameterServer class in the same session). First, you have to start Ray. When using a cluster, you would pass optional parameters to the
init() method to specify the cluster location:
Next, create a
ParameterServer instance for an array of 10 parameters:
Instead of calling
ParameterServer(10) to construct an instance, the way you would for a normal Python instance, you use the
remote() method instead, which was added to the class by the
@ray.remote decorator. You pass the same arguments you would pass to the regular constructor. Note that an actor object is constructed.
Similarly, to invoke methods on the actor, you use
remote() appended to the original method name, passing the same arguments you would pass to the original method:
Actor method invocations return Futures. To retrieve the actual values, we use the blocking
As we expect, the initial parameter values are all zeros. What
ray.get(id) actually does is pull the value out of the distributed state store service that Ray provides. The value was written to the distributed state store by the actor when it updated its state. If the value and the client are both on the same machine, the value is pulled from shared memory for fast performance. If the value and client are resident on different machines, the value is pulled over to the machine that needs it.
For completeness, your code can also write values explicitly to this storage using
ray.put(id, value). When you want to retrieve several values as they become available, there is a convenient
ray.wait(…) function available. See the Ray API for more details.
Following the actor model, when clients invoke these actor methods, the invocations are routed to the actor instance, wherever it may be in the cluster. Since concurrent invocations can occur, Ray insures that each invocation is processed in a thread-safe way, so the risk of corrupting the state is prevented without the need for explicit thread synchronization code. However, this doesn’t impose any sort of globally ordering of when these invocations are processed; it is first come, first serve.
Note: Because of the dynamic nature of Python, it would have been possible for Ray to allow you to call the actor methods without
remote(), but it was decided that the explicit code change is useful documentation for readers of the code, since there are important performance implications when switching from a local method call to an RPC-like invocation. Also, the returned object is now different, a future, which requires the use of the blocking call,
ray.get(), to extract the value.
Now, suppose we want to run several worker tasks that continuously compute gradients and update the model parameters. Each worker will run in a loop that does the following three things:
- Get the latest parameters.
- Compute an update to the parameters.
- Update the parameters.
These workers will be stateless, so we’ll use a Ray task (a remote function) instead of an actor. The
worker function takes a handle to the parameter server actor as an argument, which allows the worker task to invoke methods on the parameter server:
Then we can start two of these worker tasks as follows. Ray tasks (functions) are started with the same
Then we can retrieve the parameters from the driver process repeatedly and see that they are being updated by the workers:
When the updates stop, the final values will be 200.
Note that Ray makes it as easy to start up a remote service or actor as it is to define a Python class. Handles to the actor can be passed around to other actors and tasks to allow arbitrary and intuitive messaging and communication patterns. Current alternatives are much more involved. For example, consider how the equivalent runtime service creation and service handle passing would be done with GRPC, as in this documentation.
Unifying Tasks and Actors
We’ve seen that tasks and actors use the same Ray API and are used the same way. This unification of parallel tasks and actors has important benefits, both for simplifying the use of Ray and for building powerful applications through composition.
By way of comparison, popular data processing systems such as Apache Hadoop and Apache Spark allow stateless tasks (functions with no side effects) to operate on immutable data. This assumption simplifies the overall system design and makes it easier for applications to reason about correctness.
However, shared mutable state is common in machine learning applications. That state could be the weights of a neural network, the state of a third-party simulator, or a representation of interactions with the physical world. Ray’s actor abstraction provides an intuitive way to define and manage mutable state in a thread-safe way.
What makes this especially powerful is the way that Ray unifies the actor abstraction with the task-parallel abstraction inheriting the benefits of both approaches. Ray uses an underlying dynamic task graph to implement both actors and stateless tasks in the same framework. As a consequence, these two abstractions are completely interoperable. Tasks and actors can be created from within other tasks and actors. Both return futures, which can be passed into other tasks or actor methods to introduce scheduling and data dependencies in a natural way. As a result, Ray applications inherit the best features of both tasks and actors.
Here are some of the core concepts used internally by Ray:
Dynamic Task Graphs: When you invoke a remote function or actor method, tasks are added to a dynamically growing graph, which Ray schedules and executes across a cluster (or a single multi-core machine). Tasks can be created by the “driver” application or by other tasks.
Data: Ray efficiently serializes data using the Apache Arrow data layout. Objects are shared between workers and actors on the same machine through shared memory, which avoids the need for copies or deserialization. This optimization is absolutely critical for achieving good performance.
Scheduling: Ray uses a distributed scheduling approach. Each machine has its own scheduler, which manages the workers and actors on that machine. Tasks are submitted by applications and workers to the scheduler on the same machine. From there, they can be reassigned to other workers or passed to other local schedulers. This allows Ray to achieve substantially higher task throughput than what can be achieved with a centralized scheduler, a potential bottleneck and single point of failure. This is essential for many machine learning applications.
Systems like parameter servers are normally implemented and shipped as standalone systems with a nontrivial amount of code, which could be mostly boilerplate for handling distribution, invocation, state management, etc. We’ve seen that Ray’s abstractions and features make it possible to eliminate most of that boilerplate. Hence, any feature enhancements are comparatively easy and your productivity is maximized.
Many of the common services we need in today’s production environments can be implemented this way, quickly and efficiently. Examples include logging, stream processing, simulation, model serving, graph processing, and many others.
To Learn More
For more information about Ray, take a look at the following links:
- Ray website
- Ray GitHub page
- Ray documentation: landing page, installation instructions
- Ray tutorials
- A research paper describing the Ray system in detail
- A research paper describing the flexible primitives internal to Ray for deep learning
- Fast serialization with Ray and Apache Arrow
- RLlib: Scalable reinforcement learning with Ray (and this RLlib research paper)
- Tune: Efficient hyperparameter tuning with Ray
- Modin: Speeding up Pandas with Ray
- FLOW: a computational framework using reinforcement learning for traffic control modeling
- Anyscale: the company behind Ray
Appendix: Running the Code
To run the complete application, first install Ray with
pip install ray (or see the Ray installation instructions). Then run the following code with Python. It implements the parameter server as discussed previously, but adds sharding of the parameters in the workers. You can also find a more extensive version of this example as a Jupyter notebook here.
Note that this example focuses on simplicity and that more can be done to optimize this code.