Optimizing Distributed Futures over gRPC

Eric Liang
riselab
Published in
7 min readJan 18, 2020

by Eric Liang, Edward Oakes, and Stephanie Wang

This blog post explains how gRPC and Apache Arrow can be combined to provide a flexible and high-performance distributed futures API for Python. We implement and evaluate the proposed architecture as part of Ray 0.8.0.

The Ray framework provides a distributed futures API, which is able to express both stateless and stateful computations through task and actor APIs respectively. This blog post describes how these futures are implemented with high performance in a fault-tolerant distributed system:

  1. It explains how with Ray 0.8.0, Ray operations translate directly into gRPC and Arrow calls.
  2. It explains how necessary distributed systems functionality such as scheduling, scaling, and fault handling is exposed.

Futures with Tasks and Actors

At a high level, Ray provides:

  • Tasks (remote functions): these let you run a function remotely in a cluster. Tasks are for stateless computation.
  • Actors (remote classes): these are instances of Python classes running remotely in worker processes in a cluster. Actors are for stateful computation, where evolving state needs to be managed.

Unlike other distributed frameworks, Ray allows applications to nest and compose tasks and actors in arbitrary patterns, similar to how you can work with functions and class instances in a sequential programming language. Tasks and actors interoperate through immutable future value references (object IDs) produced from Ray calls. These object IDs can be freely passed to any task or actor, just like local object references can.

As discussed in Ray for the Curious, these qualities allow Ray to support the diverse computational patterns needed to build modern ML/AI systems and applications in a scalable way. The challenge here is in implementing such a general purpose API without trading off performance.

General Purpose Futures with Minimal System Overhead

In this section, we’ll see:

  1. How Ray schedules tasks and actors.
  2. How Ray minimizes scheduling overhead over sending RPCs directly.

Basic Ray Task Call

@ray.remote
def double(x):
return x * 2
fut1 = double.remote(2)
assert ray.get(fut1) == 4

Let’s look at how the above program is executed in Ray. The semantics are as follows — we first declare double as a Ray function, and then call it with the argument 2 with double.remote(2). Then, we retrieve the result of the future returned by calling ray.get(), which should return 4. Importantly, double is scheduled for execution in a separate process by Ray.

The worker process that calls double.remote executes the following gRPC calls:

  1. It asks the Raylet (Ray scheduling daemon running on each node) for a lease on a worker to execute double(2). The scheduler inspects the task requirements (i.e., resources required, object dependencies) to find a suitable worker. While it holds the lease, no other worker process can schedule tasks on the leased worker.
  2. It then sends an ExecuteTask RPC to the leased worker, which executes the task and responds with the result. Since both the argument and return value of the task are small (more on this later), they are handled inline with the gRPC call.
Components in green boxes represent Python code. Components in white boxes are part of the Ray common runtime written in C++. Joined boxes represent a process. Any Python driver or worker can call into the Ray C++ core worker library to execute further tasks. In this figure, all processes are running on the same machine. Ray uses gRPC as a unified communication layer for both local and remote procedure calls.

If a task fails unexpectedly during execution, the calling driver or worker will attempt to reschedule it up to a given max_retries times. This is possible since the caller retains the task description until it is no longer needed.

Caching Scheduling Decisions

In a real application, there will be not one but many tasks executed. In the above example, we saw that multiple RPCs were needed to schedule a single task. Ray can do better when many tasks of the same “type” (i.e., resource requirements) are submitted at once:

futures = [double.remote(i) for i in range(1000)]
ray.get(futures) # [0, 2, 4, 6, …]

In this scenario, Ray caches scheduling decisions to amortize scheduling RPC overhead for similar tasks. This means it will end up issuing only a fraction more than 1 RPC per task, or about the same number of round trips as using gRPC directly. Any extra RPCs go towards acquiring multiple concurrent workers from the Raylet to parallelize execution:

Once a scheduling decision is made by the Raylet, the worker returned can be reused for other tasks with the same resource requirements and input dependencies. This amortizes scheduling RPC overhead when executing many similar tasks. To avoid unfair monopolization of workers when there are multiple processes trying to submit tasks, callers are only allowed to reuse workers within a few hundred milliseconds of initial grant.

Consider: What if double was called with a future as an argument (e.g., double.remote(double.remote(2)))? In this case, the calling worker will wait for the first task to finish before sending the second. The argument to the second double can be inlined into the task once we know its value, removing the object dependency, so we can now reuse the scheduling decision for the second call.

Scaling to Multiple Nodes

Let’s run through double.remote(2) again, but assuming no workers are available on the local machine:

  1. The driver asks Raylet 1 for a worker to execute double. It has no free workers, but Raylet 1 knows Raylet 2 has free resources, and redirects the request to Raylet 2.
  2. The driver sends ExecuteTask to the remote Python worker leased from Raylet 2 over gRPC.
Tasks are sent to remote workers if there are no local resources available, transparently scaling Ray applications out to multiple nodes.

The same optimization of caching scheduling decisions applies to multi-node, so Ray is still able to schedule tasks across a cluster with ~1 gRPC call per task.

Creating and Using Actors

Creating an actor is very similar to scheduling a task. In fact, creating an actor amounts to submitting the actor constructor (__init__) as a task. The main difference is that an actor is persistent and acquires a “permanent” lease on its assigned worker. Let’s consider this counter actor:

@ray.remote
class Counter:
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
return self.value
c = Counter.remote()
assert ray.get(c.increment.remote()) == 1

The worker that ends up running the actor __init__ task simply becomes the actor:

Creating an actor is done by scheduling the __init__ task for the actor. That actor creation task leases the worker forever until the actor is destroyed.

Once created, actor method calls directly translate into a gRPC call to run the method on the actor. This allows Ray actor workloads to run with very low overhead and scale linearly with the number of actors:

Once created, actor tasks translate into direct gRPC calls to the actor process. An actor can handle many concurrent calls, though here we only show one.

If an actor fails unexpectedly at runtime and max_reconstructions > 0 for the actor, it will be recreated by Ray. Further tasks will be sent to the new actor address instead.

Sending and Sharing Large Objects with Arrow

Large objects (>100KB) are not efficient to send over gRPC. Instead, Ray places large objects (which are Arrow or pickle5 serialized) into a shared memory object store provided by Apache Arrow. This means that:

  • Large objects sent between tasks on the same node are read through shared memory, with no memory copy required for multiple reads of the same data.
  • Objects will be automatically transferred to other nodes if needed. This transfer is done with multiple gRPC streams to maximize network throughput.
Large objects are placed in shared memory and transferred across nodes as needed over multiple gRPC connections. Here we show the object transfer operations that would happen if we passed a large numpy array “array“ to “double”. Note that the result is put on Node 2 and not transferred to the caller until retrieved via ray.get(obj_id).

Performance

The Ray 0.8 release enables gRPC based direct calls by default, which eliminates the scheduler as a bottleneck and improves performance 5–10x for scheduler-constrained workloads.

First, on a single machine we compare the latency for synchronous actor calls (similar to submitting a normal task to a cached worker) in Ray with a server written using gRPC Python. We see that, despite using gRPC, Ray 0.8 actually has slightly lower latency than gRPC Python. This is because Ray can leverage the more efficient C++ gRPC runtime:

Next we look at throughput. Due to the addition of multiple C++ gRPC polling threads, object transfer is much faster in Ray 0.8, reaching >21Gbps for 1GiB objects between two m4.16xl AWS nodes. On the other hand, straightforward use of gRPC Python to send large objects ends up slower than Ray due to Python runtime overhead and lack of send parallelism:

In the next microbenchmark, we compare task and actor performance in the ray microbenchmark command on a single m4.16xl node (32 CPUs). Thanks to the new scheduling algorithm, 0.8 is faster for the same CPU count:

Ray 0.8 also offers near linear scalability for fine-grained tasks. Thanks to the use of gRPC direct calls between workers, aggregate cluster throughput scales to >200k tasks/s (and would continue scaling with more nodes). Previously the Raylets were scheduling bottlenecks for certain workloads (note that the x-axis is log scale):

These performance improvements also carry over to higher level applications. When running RLlib performance tests for the 0.8.0 release, we observed 10–40% speedups on Atari training. These tests were run concurrently on a g3.16xl GPU node for an hour each:

In summary, with Ray 0.8, Ray task and actor calls now more directly translate into gRPC and Arrow operations. This allows users of Ray to leverage the performance of these underlying frameworks without needing to implement the scheduling, scaling, and fault handling required for a distributed application themselves.

You can try out Ray at https://ray.io/.

--

--