Ray — A cluster computing ML framework for emerging applications

Introduction

Ameya
Coinmonks
Published in
8 min readOct 14, 2018

--

The last post covered design principles outlined in a paper from Riselabs at Berkeley for a new framework that is needed for emerging class of AI applications. This post will build up on those design principles and do a deep dive into Ray framework that was built following those principles. I would highly recommend going over design principles before jumping into this one. This post will rely on a lot of material that was covered in the last post.

Just to recap the last post: at a high level, framework for new class of API applications will need to be able to support low latency of scheduling, high throughput for millions of tasks, dynamic generation of tasks, heterogenous tasks, arbitrary complex dataflow dependencies. Ray supports these requirements and in turn enables a new class of AI applications that use reinforcement learning(RL) that need support for distributed training, model serving and running of many simulations. Ray focuses on running of lightweight stateless tasks and also time consuming stateful operations that are needed for training.

A typical RL algorithm

A typical example of an RL agent interacting with the environment goes through the following iterative algorithm:

  1. Start with some model
  2. Make the policy(from the model) interact with environment and obtain the recommended action(Model serving)
  3. Get the reward by interacting with the simulated environment(Simulations)
  4. Repeat steps 2–3 until simulations have finished
  5. Using the results from the last step, retrain the policy. (Training)
  6. Repeat steps 1–5 till the results have converged and reward has been maximized

As you can see from this algorithm, this framework needs to be able to support model serving, simulation of tasks and training on large data. Model serving is focused on rendering policy quickly — making many decisions/inferences quickly and reducing latency by effective load balancing. Simulations use policy to evaluate its effectiveness in the given environment as current algorithms are not efficient enough to deal with physical world in real-time. Model training is based on massively distributed stochastic gradient descent. This is very much in contrast with supervised learning where model training and model serving/inference are decoupled. Training occurs offline and inference occurs online(in real-time).

Ray API primitives

This was covered in the design principles post. But here is a concise summary to refresh that:

First three pertain to stateless Tasks and the last row corresponds to stateful Actors and their API

Tasks and Actors

Tasks(first three rows) mentioned in the last section are remote functions that rely on external input and that are stateless. Think of them as functions which can be executed remotely on any node where the input is. Tasks are easy to recover — it can be done just by re-executing them. Tasks are inefficient for small updates.

Actors are stateful entities that carry state with them. These are useful for iterative modes such as training or parameter servers which rely on internal state and their continuous update. Actors have methods that can execute remotely like a task, but need a “stateful worker” to execute. Actors also make it easy to wrap third-party packages that cannot be easily deserialized as input to a task. Actors are also useful for smaller updates as opposed to bulk updates that pure tasks are designed for.

Task Graphs

Ray creates a dynamic graph based on the program that the user has written. This graph forms a dependency structure and the next tasks in the graph are executed by the system when inputs become available. The graph consists of nodes that represent data objects, remote function invocations(tasks). The graph also has edges. Dataflow edges capture dependency between tasks and data objects. Control edges capture the nested tasks or functions that create further remote functions. So far we didn’t discuss Actors in this model. Actors are again similar to tasks. The only difference is to capture the statefulness. This is done by adding a stateful edge. When Actor invokes two methods M1 and M2, a stateful edge is added between the two task nodes represented by M1 and M2. This means that the chain of stateful edges is formed for all the methods invoked by the same actor and also helping with lineage construction that is useful for recovering from fault.

Architecture

The system architecture of Ray consists of application and system layers. Let’s look at how the two layers functions.

App layer that contains drivers and workers. System layer encompassing the hybrid scheduler.

Application layer

As you can see the application layer consists of driver, worker and actors. Drivers execute the application programs. Workers execute tasks or functions present in the system. Whenever a remote function is declared, it is pushed out to a worker. Driver and the local-scheduler asks the worker to execute tasks. Actor is similar to worker in that it executes tasks, but it works on state of the previous method and only invokes methods that belong to the actor.

System layer

System layer consists of three main components that consist of global control state(GCS), Global scheduler and in memory distributed store + local scheduler.

Global control store(GCS) is used for storing the information such as all the remote function tables, event logs, object tables. At the core, it is a key-value store augmented with pub-sub functionality. This table also stores the information about where objects are located, so that that can be used for efficient colocation of tasks and data. Since GCS maintains all the state, it helps with scaling of the scheduler. In addition, GCS can be used for recovering from failures. Coarse-grained recovery has been addressed via architectures such as RDDs, but fine grained recovery needs a different mechanism, especially for dynamic tasks that get added during simulation. This is addressed by having GCS maintain the lineage information for recovery. This allows for all other components in the system to be stateless and to not worry much about recovery from failures.

Global scheduler: The global scheduler is really a hybrid, bottom-up scheduler in which tasks are submitted to the local scheduler. Local scheduler first tries to schedule the given task on the same node. If it cannot, because of constraints(lack of GPU) or because of high load on the local node, it will submit the task to the global scheduler. Global scheduler is aware of overall systems usage on all the nodes and data needs of different tasks via GCS. These tasks can then be submitted to the local scheduler on the node that the global scheduler has picked. The global scheduler may find multiple nodes that satisfy the capacity needs for this task. Since tasks in Ray are very latency sensitive, the scheduler chooses a node that minimizes the wait-time for the task. Wait-time is defined as the sum of “time spent in queues on the selected node” and “time spent in transferring inputs to that node”. GCS uses heartbeats to get information about data transfer bandwidth and also queue sizes at each node. In addition, if the scheduler becomes the bottleneck, then it is possible to scale out the scheduler using GCS as the data source.

See the figure below which makes this task scheduling workflow clearer.

Hybrid scheduler that starts with local scheduler and then goes to global scheduler to find another node if needed.

In-memory distributed object store is present on every node on the system and used for storing inputs and outputs of tasks. For any input that is not present on the node, object store will replicate those to the local node. This also helps with hot-object bottlenecks. The object store is limited to immutable data, so consistency is not a big issue. In addition, only objects that are supported are the ones that can fit on a single node.

Execution of a remote task in Ray

Typical execution of a remote task

Lets walk through the sequence of steps needed to execute a function in Ray and the steps needed to fetch the results. Consider there are two nodes N1 and N2 in the system. A remote function is declared and it is registered with the GCS and then propagated to workers on N1 and N2 (step 0 in the figure above). The application is running on N1 and wants to perform an “add” operation on two objects a and b. This operation returns a future “id” and then application on N1 calls a get() on future “id’ — which is a blocking call. Now first let’s step through what happens in the system for executing add operation. This is shown in figure (a).

  1. The add task is submitted to the local scheduler on N1.
  2. Local scheduler realizes it only has object “a” available locally. So it submits the task to global scheduler.
  3. Global scheduler looks at the the GCS for where objects “a” and “b” are present and then decides to schedule this on N2.
  4. Local scheduler on N2 gets the information about “a”from local object store. “b” is locally available.
  5. Since object store, doesn’t have “a”, it needs to get it from the node where it is present.
  6. Object store on N2 gets to know from GCS that “a” is on N1.
  7. “a” gets replicated on N2 via object store replication from N1
  8. Local scheduler schedules the add tasks using local workers
  9. Local worker uses the object store to fetch “a” and “b” to perform the addition.

The second part of the application is to get the result of addition using the future “id” referenced by the application on N2. This is shown in figure (b)

  1. Local scheduler on N1 is executing the get on future “id”. It checks the object store locally — which doesn’t have an entry for “id” yet.
  2. The local object store asks GCS for “id”. Since GCS doesn’t have it, a callback is registered with GCS.
  3. In the meanwhile, add completes on node N2. The data is stored in the objects store on N2.
  4. N2 object store informs GCS about availability of results of “id”. This is registered in the object table.
  5. GCS invokes the callback on N1, mentioning the availability of “id” on N2.
  6. The object store on N1 replicates “id” from N2.
  7. Now the task on N1 waiting for future-get (ray.get) is unblocked.

Evaluation for RL applications

This paper also compares the performance of Ray framework against two RL algorithms and their reference implementations. These two algorithms are evolutions strategies and proximal policy optimizations. For both cases, ray based algorithm performs much better compared to one-off solutions built for these problems. Keep in mind that building such one-off solutions is pretty complex. For ES, the reference solution doesn’t scale beyond 1024 nodes, while ray scales upto 8192 node. Similarly for PPO, ray outperforms OpenMPI implementation at a much lower cost(thus improving cluster utilization) and needed far less number of GPUs. This was possible because tasks and actors can ask for specific heterogeneous resources and use data locality better while pinning those resources.

Conclusion

Both, the design principles and the actual framework paper, make for an interesting read on new-age ML frameworks need to evolve to support RL techniques. Frameworks that can support Hybrid scheduling and combination of task/actor based approaches seem like a good fit for this problem space.

Get Best Software Deals Directly In Your Inbox

--

--