Ray Asynchronous and Threaded Actors: A way to achieve concurrency

Jules S. Damji
5 min readSep 28, 2022

--

We are on Day 11 of #30DaysOfRay.

A Ray actor is bound to a Ray worker Python process. The actor runs within the Python worker’s main thread. The Counter actor is bound to a Ray core worker process upon creation and lease allocation. All its methods for that instance handle will be submitted and executed on the same Ray worker Python process, using and accessing the same resources.

Fig 1. Ray actor mapped to a Ray node’s Python worker process

In a Python worker process, how do you achieve any form of concurrency? If you desire some level of concurrency rather than serial or guaranteed methods’ execution order within a Ray actor, there are two programmatic mechanisms to achieve concurrency with Python’s single-threaded actor worker process. The first is asynchronous execution (with coroutines), and the second is threaded execution (with a pool of threads).

While each mechanism can achieve a degree of concurrency, neither can pass the global interpreter lock (GIL), which allows only one thread of Python code to run simultaneously. And the same applies to only a single coroutine executed at once in an asynchronous event loop. However, threaded actor methods can achieve true parallelism if they call into a library with c++ code. For example, calling into the NumPy package will release the GIL lock.

By default, all Ray actor’s methods are executed in the order submitted to a single instance of an actor. For example, let’s imagine we have the following simple actor:

Fig. 2 Regular actor and its methods

This serially guarantees the execution order: method_a() is executed first, followed by method_b() on the Ray node’s Python worker process to which the RegularSyncActor is bound. Both methods are executed as regular Python functions within its single main thread. No concurrency, let alone parallelism, is achieved.

Consider this SyncFlagActor as an I/O bound example, where an actor’s method downloads a country’s flag image insignia from the internet, one at a time, by invoking the actor’s method repeatedly. This is purely a serial method, taking 5.03 secs.

Fig 3. Execution times for running a regular Ray actor

Such I/O-bounds tasks lend themselves well to asynchronous processing when a particular task is blocked on IO while waiting for data to read or fetch. Since Python 3.5, you can write concurrent code using the async/await syntax. Ray natively integrates with asyncio.

Writing asynchronous Ray actors

So let’s convert our SyncFlagActor into AsyncFlagActor using async/await syntax for all its methods. When Ray detects any async method definitions in a Ray actor, it supports the async semantics. Examine the converted actor below.

Under the hood, Ray runs all methods inside a single Python event loop as coroutines. Thus each method is created as a coroutine, and when the coroutine is blocked on IO, it yields to another coroutine or method to run.

Although only one coroutine is run at any point, coroutines are multiplexed within the Python worker process’ single thread. This method achieves a level of concurrency.

To limit the maximum concurrency of coroutines, you can specify an argument in the actor’s instantiation. For example, AsynchFlagActor.options(max_concurrency=#number). By default, the level is set to 1000.

This method reduces the timing for IO-bound coroutine significantly compared to the default serial method execution, from 5.03 secs to 0.71 secs.

Fig 4. Execution times for running an AsyncFlagActor

While the execution time for an async actor is impressive, almost by a factor of seven (for this case), other cases may vary, depending on the coroutine. You may have a method, converted as a coroutine, that performs a heavy computation task while blocking the event loop, not yielding via an await call. This may hurt the performance of an Asynchronous Ray Actor.

Writing threaded Ray actors

To alleviate the above Asynchronous Ray Actor hurdle, you can employ a threaded Ray actor, using a pool of threads instead of coroutines, allowing you a threaded concurrency. The code is not dissimilar to our regular SyncFlagActor, except that we specify the max_concurrency as a thread pool and remove all async syntax. Examine its equivalent code ThreadedFlagActor here.

When we execute this threaded Ray actor, with max_concurrency level same as AsyncFlagActor, we achieve a timing of 0.65 secs, a drop from 0.71 secs. A mild performance gain from the async version.

Fig 5. Execution times for ThreadedFlag Ray Actor

TL; DR

To sum up, Ray actors, by default, execute their methods serially when submitted to the same instance of an actor bound to the Ray node’s Python worker process. The order of execution is the same as the order of submission.

But there are mechanisms to achieve a level of concurrent execution for actor methods.

The above examples illustrated how you could achieve concurrent execution via two schemes: writing asynchronous Ray actors or threaded actors. We demonstrated a significant drop from the default execution of Ray’s method to asynchronous actors and threaded actors.

The takeaway is that for Ray actor methods that are I/O bound, writing asynchronous or threaded actors may achieve a degree of concurrent execution, where execution order is not guaranteed. In contrast, if you want serial execution of actor methods and maintain the execution order as submitted or invoked, use the synchronous (or regular) Ray actors.

Check out other asynchronous and threaded actor examples here and their respective run times charted below. Your times may vary.

Fig 6. Various execution times for sync, async, and threaded Ray Actors

You can read Day 10 and the Days 1–9 of #30DaysOfRay to catch up.

--

--

Jules S. Damji

Developer at heart; Advocate by nature | Communicator by choice | Avid Arsenal Fan| Love Reading, Writing, APIs, Coding | All Tweets are Mine