Ray: A Distributed Framework for Reinforcement Learning Applications

Rabindra Nath Nandi
CompositeAI
Published in
5 min readApr 9, 2018

Michael I. Jordan, Professor from UCBerkely and his research group at RISE lab developed a new distributed machine learning framework called Ray that suppots buliding real time reinforcement learning systems. Ray is a soft replacement of Spark, uses a dynamic task graph computation model that supports both the task-parallel and the actor programming models. A nice introudction of Ray by M. Jordan is found there.

The typical systems requirements for emerging AI applications: support for (a) heterogeneous, con- current computations, (b) dynamic task graphs, © high-throughput and low-latency scheduling, and (d) transparent fault tolerance.

Ray provides horizontally scalable architecture to provide all above supports and an actor abstraction model along with the task-parallel programming abstraction.

Ray’s architecture

Ray’s architecture consists of two parts: an application layer and a system layer.

The application layer consists of three components. These are Driver, Worker and Actor.

Driver is a process which executes the user program.

Worker is a stateless process that executes tasks (remote functions) invoked by a driver or another worker. Workers are started automatically and as- signed tasks by the system layer. When a remote function is declared, the function is automatically published to all workers. A worker executes tasks serially.

Actor is a state-ful process that executes, when in-voked, the methods it exposes. Unlike a worker, an actor is explicitly instantiated by a worker or a driver. Like workers, actors execute methods seri-ally.

Figure 1: Ray’s architecture consists

The System layer consists of three major components: a global control store, a distributed scheduler, and a distributed object store.

Global Control Store (GCS) stores all up-to-date metadata and control state information in the system. It includes 1)specification of every task, (2) the code for every remote function, (3) the computation graph, (4) the current locations of all objects, and (5) every scheduling event. The GCS also provides publish-subscribe infrastructure to facili- tate communication between components.

By storing and managing the entire control state in a centralized fashion, the GCS enables every other compo- nent to be stateless. This not only simplifies the support for fault tolerance (i.e., on failure, components restart and read the latest state from the GCS), but also makes it easy to horizontally scale every other component, as all the state shared by the component’s replicas or shards is accessible via the GCS.

Bottom-Up Distributed Scheduler

Ray uses a global scheduler and per-node local schedulers. Tasks created on a node are submitted to the node’s local scheduler first, not to a global scheduler. If a local scheduler doesn’t schedule a task, it sends the task to the global scheduler.

Each local scheduler sends periodic heartbeats (e.g., every 100ms) to the GCS containing its load informa- tion. The GCS records this information and forwards it to the global scheduler. Upon receiving a task, the global scheduler uses the latest load information from ev- ery node along with the locations and sizes of the task’s inputs (from the GCS’s object metadata) to decide which node to assign the task to.

Figure 2: Bottom-up distributed scheduler. Tasks are sub- mitted bottom-up, from drivers and workers to a local sched- uler and forwarded to the global scheduler only if needed (Sec- tion 4.2.2). The thickness of each arrow is proportional to its request rate.

If the global scheduler becomes a bottleneck, we can instantiate more replicas and have each local scheduler randomly pick a replica to send its tasks to. This makes our scheduler architecture highly scalable.

In-Memory Distributed Object Store

This allows workers and actors to share data efficiently. On each node, There is a object store via shared memory. This allows zero-copy data sharing between tasks running on the same node.

If a task’s inputs are not local, the inputs are repli- cated to the local object store on the same node before execution. Tasks also write all outputs to the local object store. Replication eliminates the potential bottleneck due to hot data objects and minimizes task execution time as a task only reads and writes data in local memory. This increases throughput for computation-bound workloads, a profile shared by many AI applications.

Figure 3: An example of Sum Operation , c = a +b in Ray

An end-to-end example that adds a and b and returns c. Solid lines are data plane operations and dotted lines are control plane operations. (a) The function add() is registered with the GCS by node 1 (N1), invoked on N1, and executed on N2. (b) N1 gets add()’s result using ray.get(). The Object Table entry for c is created in step 4 and updated in step 6 after c is copied to N1.

Ray API Functions

The major four Api’s are

  1. futures list = f.remote(args)

—Execute function f () remotely. f () takes either object values or futures as arguments, and returns a list of futures. This is a non-blocking call.

2. obj list = ray.get(futures list)

—Return the values associated with a list of futures. This is a blocking call.

3. futures list done = ray.wait(futures list,k,timeout)

—Given a list of futures, return the futures whose corresponding tasks have completed as soon as either k of the tasks have completed or the timeout expires.

4. actor = Class.remote(args)
futures list = actor.method.remote(args)

—Instantiate class Class as a remote actor, and return a reference to it. Call a method on the remote actor and return a list of futures. This is a non-blocking call.

The whole information is collected from the original paper Ray: A Distributed Framework for Emerging AI Applications. The paper link. The GitHub repository of Ray is https://github.com/ray-project.

--

--