How Ray Uses gRPC (and Arrow) to Outperform gRPC
by Eric Liang, Edward Oakes, and Stephanie Wang
This blog post explains how the Ray 0.8 release uses gRPC and Apache Arrow to provide a distributed Python API that can be both faster and simpler than using gRPC directly.
When deciding on the libraries and tools to use for a project, a good rule of thumb is to pick those only as powerful as you need. It is easy to see why: large dependencies often introduce unwanted complexity, performance overhead, and architectural constraints. Often, it is better to pick from a smaller set of (lower-level) libraries and reimplement any required missing functionality yourself.
Due in part to (1) this principle of least power and (2) the lack of a sufficiently flexible framework, today’s scale-out ML applications typically are built using low-level communication libraries and container orchestrators. While this is often the right choice for each individual project, it has resulted in a fragmented ecosystem where application developers have to do substantial distributed systems engineering, often gluing together multiple distributed systems or building new ones from scratch.
The Ray framework aims to remove this tradeoff by providing a lightweight yet general-purpose distributed API. Importantly, it does not sacrifice the performance and flexibility of lower-level libraries. This means that developers can have their cake and eat it too: Ray handles the complex but necessary aspects of distributed computation such as scheduling, scaling, and fault handling, and does so without the performance or architectural restrictions that heavier or less general frameworks impose.
Ray allows developers to easily build distributed Python applications. The performance of these applications often matches or exceeds that of applications built directly using lower-level libraries. This blog post provides intuition on how this can be:
- It explains how with Ray 0.8.0, most Ray operations now translate directly into gRPC and Arrow calls for improved performance.
- It explains how necessary distributed systems functionality such as scheduling, scaling, and fault handling is exposed through Ray’s task and actor APIs.
Overview of Ray
Note: For a more in-depth introduction to Ray and its API, refer to Ray for the Curious.
At a high level, Ray provides:
- Tasks (remote functions): these let you run a function remotely in a cluster. Tasks are for stateless computation.
- Actors (remote classes): these are instances of Python classes running remotely in worker processes in a cluster. Actors are for stateful computation, where evolving state needs to be managed.
Unlike other distributed frameworks, Ray allows applications to nest and compose tasks and actors in arbitrary patterns, similar to how you can work with functions and class instances in a sequential programming language. Tasks and actors interoperate through immutable future value references (object IDs) produced from Ray calls. These object IDs can be freely passed to any task or actor, just like local object references can.
How Ray calls translate into gRPC operations
In this section, we’ll see:
- How Ray schedules tasks and actors.
- How Ray minimizes scheduling overhead over sending RPCs directly.
Note: The following sections describe the task scheduling algorithms of Ray 0.8.0+, which differ significantly from earlier versions of Ray (that do not leverage gRPC).
Basic Ray Task Call
@ray.remote
def double(x):
return x * 2fut1 = double.remote(2)
assert ray.get(fut1) == 4
Let’s look at how the above program is executed in Ray. The semantics are as follows — we first declare double
as a Ray function, and then call it with the argument 2
with double.remote(2)
. Then, we retrieve the result of the future returned by calling ray.get()
, which should return 4
. Importantly, double
is scheduled for execution in a separate process by Ray.
The worker process that calls double.remote
executes the following gRPC calls:
- It asks the Raylet (Ray scheduling daemon running on each node) for a lease on a worker to execute
double(2)
. The scheduler inspects the task requirements (i.e., resources required, object dependencies) to find a suitable worker. While it holds the lease, no other worker process can schedule tasks on the leased worker. - It then sends an ExecuteTask RPC to the leased worker, which executes the task and responds with the result. Since both the argument and return value of the task are small (more on this later), they are handled inline with the gRPC call.
If a task fails unexpectedly during execution, the calling driver or worker will attempt to reschedule it up to a given max_retries
times. This is possible since the caller retains the task description until it is no longer needed.
Caching Scheduling Decisions
In a real application, there will be not one but many tasks executed. In the above example, we saw that multiple RPCs were needed to schedule a single task. Ray can do better when many tasks of the same “type” (i.e., resource requirements) are submitted at once:
futures = [double.remote(i) for i in range(1000)]
ray.get(futures) # [0, 2, 4, 6, …]
In this scenario, Ray caches scheduling decisions to amortize scheduling RPC overhead for similar tasks. This means it will end up issuing only a fraction more than 1 RPC per task, or about the same number of round trips as using gRPC directly. Any extra RPCs go towards acquiring multiple concurrent workers from the Raylet to parallelize execution:
Consider: What if
double
was called with a future as an argument (e.g.,double.remote(double.remote(2))
)? In this case, the calling worker will wait for the first task to finish before sending the second. The argument to the second double can be inlined into the task once we know its value, removing the object dependency, so we can now reuse the scheduling decision for the second call.
Scaling to Multiple Nodes
Let’s run through double.remote(2)
again, but assuming no workers are available on the local machine:
- The driver asks Raylet 1 for a worker to execute
double
. It has no free workers, but Raylet 1 knows Raylet 2 has free resources, and redirects the request to Raylet 2. - The driver sends ExecuteTask to the remote Python worker leased from Raylet 2 over gRPC.
The same optimization of caching scheduling decisions applies to multi-node, so Ray is still able to schedule tasks across a cluster with ~1 gRPC call per task.
Creating and Using Actors
Creating an actor is very similar to scheduling a task. In fact, creating an actor amounts to submitting the actor constructor (__init__
) as a task. The main difference is that an actor is persistent and acquires a “permanent” lease on its assigned worker. Let’s consider this counter actor:
@ray.remote
class Counter:
def __init__(self):
self.value = 0 def increment(self):
self.value += 1
return self.valuec = Counter.remote()
assert ray.get(c.increment.remote()) == 1
The worker that ends up running the actor __init__
task simply becomes the actor:
Once created, actor method calls directly translate into a gRPC call to run the method on the actor. This allows Ray actor workloads to run with very low overhead and scale linearly with the number of actors:
If an actor fails unexpectedly at runtime and max_reconstructions > 0
for the actor, it will be recreated by Ray. Further tasks will be sent to the new actor address instead.
Sending and Sharing Large Objects with Arrow
Large objects (>100KB) are not efficient to send over gRPC. Instead, Ray places large objects (which are Arrow or pickle5 serialized) into a shared memory object store provided by Apache Arrow. This means that:
- Large objects sent between tasks on the same node are read through shared memory, with no memory copy required for multiple reads of the same data.
- Objects will be automatically transferred to other nodes if needed. This transfer is done with multiple gRPC streams to maximize network throughput.
Ray 0.8 Performance
The Ray 0.8 release enables gRPC based direct calls by default, which eliminates the scheduler as a bottleneck and improves performance 5–10x for scheduler-constrained workloads.
First, on a single machine we compare the latency for synchronous actor calls (similar to submitting a normal task to a cached worker) in Ray with a server written using gRPC Python. We see that, despite using gRPC, Ray 0.8 actually has slightly lower latency than gRPC Python. This is because Ray can leverage the more efficient C++ gRPC runtime:
Next we look at throughput. Due to the addition of multiple C++ gRPC polling threads, object transfer is much faster in Ray 0.8, reaching >21Gbps for 1GiB objects between two m4.16xl AWS nodes. On the other hand, straightforward use of gRPC Python to send large objects ends up slower than Ray due to Python runtime overhead and lack of send parallelism:
In the next microbenchmark, we compare task and actor performance in the ray microbenchmark
command on a single m4.16xl node (32 CPUs). Thanks to the new scheduling algorithm, 0.8 is faster for the same CPU count:
Ray 0.8 also offers near linear scalability for fine-grained tasks. Thanks to the use of gRPC direct calls between workers, aggregate cluster throughput scales to >200k tasks/s (and would continue scaling with more nodes). Previously the Raylets were scheduling bottlenecks for certain workloads (note that the x-axis is log scale):
These performance improvements also carry over to higher level applications. When running RLlib performance tests for the 0.8.0 release, we observed 10–40% speedups on Atari training. These tests were run concurrently on a g3.16xl GPU node for an hour each:
In summary, with Ray 0.8, Ray task and actor calls now more directly translate into gRPC and Arrow operations. This allows users of Ray to leverage the performance of these underlying frameworks without needing to implement the scheduling, scaling, and fault handling required for a distributed application themselves.
At Anyscale, we’re working on a number of enhancements for Ray 1.0, including:
- Support for tasks/actors written in C++ and Rust (in addition to Python and Java today).
- Distributed reference counting for shared memory objects.
- Accelerated point-to-point GPU transfers.
If you have any questions or thoughts about Ray, please join our community through Discourse or Slack. If you would like to know how Ray is being used in industry, you can attend or watch Ray Summit.
Acknowledgements: We thank Ant Financial and the following contributors for their work towards Ray 0.8: @1beb, @AdamGleave, @adamochayon, @adizim, @ajgokhale, @akharitonov, @alex-petrenko, @alindkhare, @AmeerHajAli, @anthonyhsyu, @antoine-galataud, @ashione, @batzner, @chaokunyang, @Coac, @ConeyLiu, @couturierc, @daiyaanarfeen, @danyangz, @DaveyBiggers, @deanwampler, @dulex123, @ecederstrand, @edoakes, @ericl, @eugenevinitsky, @ffbin, @flying-mojo, @gehring, @gravitywp, @gregSchwartz18, @guoyuhong, @haje01, @hartikainen, @hershg, @hhbyyh, @holli, @idthanm, @istoica, @JasonWayne, @jiangzihao2009, @jichan3751, @jon-chuang, @joneswong, @jovany-wang, @jredondopizarro, @kfstorm, @kiddyboots216, @layssi, @lisadunlap, @llan-ml, @LorenzoCevolani, @ls-daniel, @lsklyut, @lufol, @Maltimore, @mawright, @mehrdadn, @mhgump, @micafan, @michaelzhiluo, @mimoralea, @MissiontoMars, @mitchellstern, @morgangiraud, @MQQ, @nflu, @pcmoritz, @pengzhenghao, @pschafhalter, @Qstar, @qxcv, @raulchen, @RehanSD, @richardliaw, @robertnishihara, @rueberger, @simon-mo, @sotte, @stackedsax, @stefanpantic, @stephanie-wang, @suquark, @sytelus, @TianhongDai, @timgates42, @timonbimon, @TomVeniat, @ujvl, @vakker, @vipulharsh, @visatish, @waldroje, @walterddr, @wsjeon, @wumuzi520, @zhijunfu, @zhu-eric, @zhuohan123, @zplizzi, and @ztangent.