Optimizing Distributed Futures over gRPC
by Eric Liang, Edward Oakes, and Stephanie Wang
This blog post explains how gRPC and Apache Arrow can be combined to provide a flexible and high-performance distributed futures API for Python. We implement and evaluate the proposed architecture as part of Ray 0.8.0.
The Ray framework provides a distributed futures API, which is able to express both stateless and stateful computations through task and actor APIs respectively. This blog post describes how these futures are implemented with high performance in a fault-tolerant distributed system:
- It explains how with Ray 0.8.0, Ray operations translate directly into gRPC and Arrow calls.
- It explains how necessary distributed systems functionality such as scheduling, scaling, and fault handling is exposed.
Futures with Tasks and Actors
At a high level, Ray provides:
- Tasks (remote functions): these let you run a function remotely in a cluster. Tasks are for stateless computation.
- Actors (remote classes): these are instances of Python classes running remotely in worker processes in a cluster. Actors are for stateful computation, where evolving state needs to be managed.
Unlike other distributed frameworks, Ray allows applications to nest and compose tasks and actors in arbitrary patterns, similar to how you can work with functions and class instances in a sequential programming language. Tasks and actors interoperate through immutable future value references (object IDs) produced from Ray calls. These object IDs can be freely passed to any task or actor, just like local object references can.
As discussed in Ray for the Curious, these qualities allow Ray to support the diverse computational patterns needed to build modern ML/AI systems and applications in a scalable way. The challenge here is in implementing such a general purpose API without trading off performance.
General Purpose Futures with Minimal System Overhead
In this section, we’ll see:
- How Ray schedules tasks and actors.
- How Ray minimizes scheduling overhead over sending RPCs directly.
Basic Ray Task Call
@ray.remote
def double(x):
return x * 2fut1 = double.remote(2)
assert ray.get(fut1) == 4
Let’s look at how the above program is executed in Ray. The semantics are as follows — we first declare double
as a Ray function, and then call it with the argument 2
with double.remote(2)
. Then, we retrieve the result of the future returned by calling ray.get()
, which should return 4
. Importantly, double
is scheduled for execution in a separate process by Ray.
The worker process that calls double.remote
executes the following gRPC calls:
- It asks the Raylet (Ray scheduling daemon running on each node) for a lease on a worker to execute
double(2)
. The scheduler inspects the task requirements (i.e., resources required, object dependencies) to find a suitable worker. While it holds the lease, no other worker process can schedule tasks on the leased worker. - It then sends an ExecuteTask RPC to the leased worker, which executes the task and responds with the result. Since both the argument and return value of the task are small (more on this later), they are handled inline with the gRPC call.
If a task fails unexpectedly during execution, the calling driver or worker will attempt to reschedule it up to a given max_retries
times. This is possible since the caller retains the task description until it is no longer needed.
Caching Scheduling Decisions
In a real application, there will be not one but many tasks executed. In the above example, we saw that multiple RPCs were needed to schedule a single task. Ray can do better when many tasks of the same “type” (i.e., resource requirements) are submitted at once:
futures = [double.remote(i) for i in range(1000)]
ray.get(futures) # [0, 2, 4, 6, …]
In this scenario, Ray caches scheduling decisions to amortize scheduling RPC overhead for similar tasks. This means it will end up issuing only a fraction more than 1 RPC per task, or about the same number of round trips as using gRPC directly. Any extra RPCs go towards acquiring multiple concurrent workers from the Raylet to parallelize execution:
Consider: What if
double
was called with a future as an argument (e.g.,double.remote(double.remote(2))
)? In this case, the calling worker will wait for the first task to finish before sending the second. The argument to the second double can be inlined into the task once we know its value, removing the object dependency, so we can now reuse the scheduling decision for the second call.
Scaling to Multiple Nodes
Let’s run through double.remote(2)
again, but assuming no workers are available on the local machine:
- The driver asks Raylet 1 for a worker to execute
double
. It has no free workers, but Raylet 1 knows Raylet 2 has free resources, and redirects the request to Raylet 2. - The driver sends ExecuteTask to the remote Python worker leased from Raylet 2 over gRPC.
The same optimization of caching scheduling decisions applies to multi-node, so Ray is still able to schedule tasks across a cluster with ~1 gRPC call per task.
Creating and Using Actors
Creating an actor is very similar to scheduling a task. In fact, creating an actor amounts to submitting the actor constructor (__init__
) as a task. The main difference is that an actor is persistent and acquires a “permanent” lease on its assigned worker. Let’s consider this counter actor:
@ray.remote
class Counter:
def __init__(self):
self.value = 0def increment(self):
self.value += 1
return self.valuec = Counter.remote()
assert ray.get(c.increment.remote()) == 1
The worker that ends up running the actor __init__
task simply becomes the actor:
Once created, actor method calls directly translate into a gRPC call to run the method on the actor. This allows Ray actor workloads to run with very low overhead and scale linearly with the number of actors:
If an actor fails unexpectedly at runtime and max_reconstructions > 0
for the actor, it will be recreated by Ray. Further tasks will be sent to the new actor address instead.
Sending and Sharing Large Objects with Arrow
Large objects (>100KB) are not efficient to send over gRPC. Instead, Ray places large objects (which are Arrow or pickle5 serialized) into a shared memory object store provided by Apache Arrow. This means that:
- Large objects sent between tasks on the same node are read through shared memory, with no memory copy required for multiple reads of the same data.
- Objects will be automatically transferred to other nodes if needed. This transfer is done with multiple gRPC streams to maximize network throughput.
array“
to “double”. Note that the result is put on Node 2 and not transferred to the caller until retrieved via ray.get(obj_id).Performance
The Ray 0.8 release enables gRPC based direct calls by default, which eliminates the scheduler as a bottleneck and improves performance 5–10x for scheduler-constrained workloads.
First, on a single machine we compare the latency for synchronous actor calls (similar to submitting a normal task to a cached worker) in Ray with a server written using gRPC Python. We see that, despite using gRPC, Ray 0.8 actually has slightly lower latency than gRPC Python. This is because Ray can leverage the more efficient C++ gRPC runtime:
Next we look at throughput. Due to the addition of multiple C++ gRPC polling threads, object transfer is much faster in Ray 0.8, reaching >21Gbps for 1GiB objects between two m4.16xl AWS nodes. On the other hand, straightforward use of gRPC Python to send large objects ends up slower than Ray due to Python runtime overhead and lack of send parallelism:
In the next microbenchmark, we compare task and actor performance in the ray microbenchmark
command on a single m4.16xl node (32 CPUs). Thanks to the new scheduling algorithm, 0.8 is faster for the same CPU count:
Ray 0.8 also offers near linear scalability for fine-grained tasks. Thanks to the use of gRPC direct calls between workers, aggregate cluster throughput scales to >200k tasks/s (and would continue scaling with more nodes). Previously the Raylets were scheduling bottlenecks for certain workloads (note that the x-axis is log scale):
These performance improvements also carry over to higher level applications. When running RLlib performance tests for the 0.8.0 release, we observed 10–40% speedups on Atari training. These tests were run concurrently on a g3.16xl GPU node for an hour each:
In summary, with Ray 0.8, Ray task and actor calls now more directly translate into gRPC and Arrow operations. This allows users of Ray to leverage the performance of these underlying frameworks without needing to implement the scheduling, scaling, and fault handling required for a distributed application themselves.
You can try out Ray at https://ray.io/.