How Ray Uses gRPC (and Arrow) to Outperform gRPC

Eric Liang
Distributed Computing with Ray
9 min readJan 14, 2020

by Eric Liang, Edward Oakes, and Stephanie Wang

This blog post explains how the Ray 0.8 release uses gRPC and Apache Arrow to provide a distributed Python API that can be both faster and simpler than using gRPC directly.

gRPC and Apache Arrow in Ray

When deciding on the libraries and tools to use for a project, a good rule of thumb is to pick those only as powerful as you need. It is easy to see why: large dependencies often introduce unwanted complexity, performance overhead, and architectural constraints. Often, it is better to pick from a smaller set of (lower-level) libraries and reimplement any required missing functionality yourself.

Due in part to (1) this principle of least power and (2) the lack of a sufficiently flexible framework, today’s scale-out ML applications typically are built using low-level communication libraries and container orchestrators. While this is often the right choice for each individual project, it has resulted in a fragmented ecosystem where application developers have to do substantial distributed systems engineering, often gluing together multiple distributed systems or building new ones from scratch.

The Ray framework aims to remove this tradeoff by providing a lightweight yet general-purpose distributed API. Importantly, it does not sacrifice the performance and flexibility of lower-level libraries. This means that developers can have their cake and eat it too: Ray handles the complex but necessary aspects of distributed computation such as scheduling, scaling, and fault handling, and does so without the performance or architectural restrictions that heavier or less general frameworks impose.

Ray allows developers to easily build distributed Python applications. The performance of these applications often matches or exceeds that of applications built directly using lower-level libraries. This blog post provides intuition on how this can be:

  1. It explains how with Ray 0.8.0, most Ray operations now translate directly into gRPC and Arrow calls for improved performance.
  2. It explains how necessary distributed systems functionality such as scheduling, scaling, and fault handling is exposed through Ray’s task and actor APIs.

Overview of Ray

Note: For a more in-depth introduction to Ray and its API, refer to Ray for the Curious.

At a high level, Ray provides:

  • Tasks (remote functions): these let you run a function remotely in a cluster. Tasks are for stateless computation.
  • Actors (remote classes): these are instances of Python classes running remotely in worker processes in a cluster. Actors are for stateful computation, where evolving state needs to be managed.

Unlike other distributed frameworks, Ray allows applications to nest and compose tasks and actors in arbitrary patterns, similar to how you can work with functions and class instances in a sequential programming language. Tasks and actors interoperate through immutable future value references (object IDs) produced from Ray calls. These object IDs can be freely passed to any task or actor, just like local object references can.

The flexibility of Ray’s API enables it to support an ecosystem of high-performance applications, including RLlib, Tune, and more.

How Ray calls translate into gRPC operations

In this section, we’ll see:

  1. How Ray schedules tasks and actors.
  2. How Ray minimizes scheduling overhead over sending RPCs directly.

Note: The following sections describe the task scheduling algorithms of Ray 0.8.0+, which differ significantly from earlier versions of Ray (that do not leverage gRPC).

Basic Ray Task Call

@ray.remote
def double(x):
return x * 2
fut1 = double.remote(2)
assert ray.get(fut1) == 4

Let’s look at how the above program is executed in Ray. The semantics are as follows — we first declare double as a Ray function, and then call it with the argument 2 with double.remote(2). Then, we retrieve the result of the future returned by calling ray.get(), which should return 4. Importantly, double is scheduled for execution in a separate process by Ray.

The worker process that calls double.remote executes the following gRPC calls:

  1. It asks the Raylet (Ray scheduling daemon running on each node) for a lease on a worker to execute double(2). The scheduler inspects the task requirements (i.e., resources required, object dependencies) to find a suitable worker. While it holds the lease, no other worker process can schedule tasks on the leased worker.
  2. It then sends an ExecuteTask RPC to the leased worker, which executes the task and responds with the result. Since both the argument and return value of the task are small (more on this later), they are handled inline with the gRPC call.
Components in green boxes represent Python code. Components in white boxes are part of the Ray common runtime written in C++. Joined boxes represent a process. Any Python driver or worker can call into the Ray C++ core worker library to execute further tasks. In this figure, all processes are running on the same machine. Ray uses gRPC as a unified communication layer for both local and remote procedure calls.

If a task fails unexpectedly during execution, the calling driver or worker will attempt to reschedule it up to a given max_retries times. This is possible since the caller retains the task description until it is no longer needed.

Caching Scheduling Decisions

In a real application, there will be not one but many tasks executed. In the above example, we saw that multiple RPCs were needed to schedule a single task. Ray can do better when many tasks of the same “type” (i.e., resource requirements) are submitted at once:

futures = [double.remote(i) for i in range(1000)]
ray.get(futures) # [0, 2, 4, 6, …]

In this scenario, Ray caches scheduling decisions to amortize scheduling RPC overhead for similar tasks. This means it will end up issuing only a fraction more than 1 RPC per task, or about the same number of round trips as using gRPC directly. Any extra RPCs go towards acquiring multiple concurrent workers from the Raylet to parallelize execution:

Once a scheduling decision is made by the Raylet, the worker returned can be reused for other tasks with the same resource requirements and input dependencies. This amortizes scheduling RPC overhead when executing many similar tasks. To avoid unfair monopolization of workers when there are multiple processes trying to submit tasks, callers are only allowed to reuse workers within a few hundred milliseconds of initial grant.

Consider: What if double was called with a future as an argument (e.g., double.remote(double.remote(2)))? In this case, the calling worker will wait for the first task to finish before sending the second. The argument to the second double can be inlined into the task once we know its value, removing the object dependency, so we can now reuse the scheduling decision for the second call.

Scaling to Multiple Nodes

Let’s run through double.remote(2) again, but assuming no workers are available on the local machine:

  1. The driver asks Raylet 1 for a worker to execute double. It has no free workers, but Raylet 1 knows Raylet 2 has free resources, and redirects the request to Raylet 2.
  2. The driver sends ExecuteTask to the remote Python worker leased from Raylet 2 over gRPC.
Tasks are sent to remote workers if there are no local resources available, transparently scaling Ray applications out to multiple nodes.

The same optimization of caching scheduling decisions applies to multi-node, so Ray is still able to schedule tasks across a cluster with ~1 gRPC call per task.

Creating and Using Actors

Creating an actor is very similar to scheduling a task. In fact, creating an actor amounts to submitting the actor constructor (__init__) as a task. The main difference is that an actor is persistent and acquires a “permanent” lease on its assigned worker. Let’s consider this counter actor:

@ray.remote
class Counter:
def __init__(self):
self.value = 0
def increment(self):
self.value += 1
return self.value
c = Counter.remote()
assert ray.get(c.increment.remote()) == 1

The worker that ends up running the actor __init__ task simply becomes the actor:

Creating an actor is done by scheduling the __init__ task for the actor. That actor creation task leases the worker forever until the actor is destroyed.

Once created, actor method calls directly translate into a gRPC call to run the method on the actor. This allows Ray actor workloads to run with very low overhead and scale linearly with the number of actors:

Once created, actor tasks translate into direct gRPC calls to the actor process. An actor can handle many concurrent calls, though here we only show one.

If an actor fails unexpectedly at runtime and max_reconstructions > 0 for the actor, it will be recreated by Ray. Further tasks will be sent to the new actor address instead.

Sending and Sharing Large Objects with Arrow

Large objects (>100KB) are not efficient to send over gRPC. Instead, Ray places large objects (which are Arrow or pickle5 serialized) into a shared memory object store provided by Apache Arrow. This means that:

  • Large objects sent between tasks on the same node are read through shared memory, with no memory copy required for multiple reads of the same data.
  • Objects will be automatically transferred to other nodes if needed. This transfer is done with multiple gRPC streams to maximize network throughput.
Large objects are placed in shared memory and transferred across nodes as needed over multiple gRPC connections. Here we show the object transfer operations that would happen if we passed a large numpy array "array“ to “double”. Note that the result is put on Node 2 and not transferred to the caller until retrieved via ray.get(obj_id).

Ray 0.8 Performance

The Ray 0.8 release enables gRPC based direct calls by default, which eliminates the scheduler as a bottleneck and improves performance 5–10x for scheduler-constrained workloads.

First, on a single machine we compare the latency for synchronous actor calls (similar to submitting a normal task to a cached worker) in Ray with a server written using gRPC Python. We see that, despite using gRPC, Ray 0.8 actually has slightly lower latency than gRPC Python. This is because Ray can leverage the more efficient C++ gRPC runtime:

Next we look at throughput. Due to the addition of multiple C++ gRPC polling threads, object transfer is much faster in Ray 0.8, reaching >21Gbps for 1GiB objects between two m4.16xl AWS nodes. On the other hand, straightforward use of gRPC Python to send large objects ends up slower than Ray due to Python runtime overhead and lack of send parallelism:

In the next microbenchmark, we compare task and actor performance in the ray microbenchmark command on a single m4.16xl node (32 CPUs). Thanks to the new scheduling algorithm, 0.8 is faster for the same CPU count:

Ray 0.8 also offers near linear scalability for fine-grained tasks. Thanks to the use of gRPC direct calls between workers, aggregate cluster throughput scales to >200k tasks/s (and would continue scaling with more nodes). Previously the Raylets were scheduling bottlenecks for certain workloads (note that the x-axis is log scale):

These performance improvements also carry over to higher level applications. When running RLlib performance tests for the 0.8.0 release, we observed 10–40% speedups on Atari training. These tests were run concurrently on a g3.16xl GPU node for an hour each:

In summary, with Ray 0.8, Ray task and actor calls now more directly translate into gRPC and Arrow operations. This allows users of Ray to leverage the performance of these underlying frameworks without needing to implement the scheduling, scaling, and fault handling required for a distributed application themselves.

At Anyscale, we’re working on a number of enhancements for Ray 1.0, including:

  • Support for tasks/actors written in C++ and Rust (in addition to Python and Java today).
  • Distributed reference counting for shared memory objects.
  • Accelerated point-to-point GPU transfers.

If you have any questions or thoughts about Ray, please join our community through Discourse or Slack. If you would like to know how Ray is being used in industry, you can attend or watch Ray Summit.

Acknowledgements: We thank Ant Financial and the following contributors for their work towards Ray 0.8: @1beb, @AdamGleave, @adamochayon, @adizim, @ajgokhale, @akharitonov, @alex-petrenko, @alindkhare, @AmeerHajAli, @anthonyhsyu, @antoine-galataud, @ashione, @batzner, @chaokunyang, @Coac, @ConeyLiu, @couturierc, @daiyaanarfeen, @danyangz, @DaveyBiggers, @deanwampler, @dulex123, @ecederstrand, @edoakes, @ericl, @eugenevinitsky, @ffbin, @flying-mojo, @gehring, @gravitywp, @gregSchwartz18, @guoyuhong, @haje01, @hartikainen, @hershg, @hhbyyh, @holli, @idthanm, @istoica, @JasonWayne, @jiangzihao2009, @jichan3751, @jon-chuang, @joneswong, @jovany-wang, @jredondopizarro, @kfstorm, @kiddyboots216, @layssi, @lisadunlap, @llan-ml, @LorenzoCevolani, @ls-daniel, @lsklyut, @lufol, @Maltimore, @mawright, @mehrdadn, @mhgump, @micafan, @michaelzhiluo, @mimoralea, @MissiontoMars, @mitchellstern, @morgangiraud, @MQQ, @nflu, @pcmoritz, @pengzhenghao, @pschafhalter, @Qstar, @qxcv, @raulchen, @RehanSD, @richardliaw, @robertnishihara, @rueberger, @simon-mo, @sotte, @stackedsax, @stefanpantic, @stephanie-wang, @suquark, @sytelus, @TianhongDai, @timgates42, @timonbimon, @TomVeniat, @ujvl, @vakker, @vipulharsh, @visatish, @waldroje, @walterddr, @wsjeon, @wumuzi520, @zhijunfu, @zhu-eric, @zhuohan123, @zplizzi, and @ztangent.

--

--