Scaling Python Asyncio with Ray

Simon Mo
Distributed Computing with Ray
4 min readMar 2, 2020

TL;DR: Scale your existing asyncio application to thousands of cores with 20 lines of code. Ray added two features to enable seamless integration with Python asyncio ecosystem. Both features help to scale your existing asyncio application to multi-core and multi-node. You can try it out today!

This post explains how Ray natively supports Python’s single-threaded asyncio coroutines and enables seamless scaling of coroutines to multiple processes and to a cluster of machines.

Ray’s native integration with asycnio

Python asyncio provides a built-in mechanism for writing concurrent applications. It enables the efficient cooperative multitasking of multiple coroutines in a single Python process. Using the keywords async and await, developers can write structured I/O intensive programs in which coroutines give away control (yield) to other coroutines when blocked.

Ray is a framework for scaling Python applications from single machines to large clusters, built to solve several common problems in scalable, distributed computing, such as efficient distributed, execution of fine-grained “tasks”, with intuitive sequencing of dependencies, and support for distributed state. You can learn more about ray: Ray for the Curious

When using ray, you can call func.remote() or actor.method.remote() to schedule a task to run in one of the workers. To get the result, you need to call ray.get or ray.wait API to wait for the result. These APIs are blocking. This means two things:

  • Inside a worker, there can only be a single task running at time.
  • When you retrieve the result, you have to wait for the result to be available. There is no way to wait for it asynchronously.

In this blog post, we will discuss two improvements to ray that enhance Python’s asyncio features.

Concurrent actor calls

Before the asyncio support, all tasks submitted for a ray actor will be queued and executed one at a time. Asyncio actors enable all async methods to be executed concurrently inside a single Python event loop. To start an asyncio actor, you just mark a method with async keyword and calls await inside the method implementation.

All the actor methods are executed inside a dedicated Python event loop thread. The synchronous methods will be wrapped inside an async method wrapper and executed inside the same event loop. So you don’t need to worry about race conditions or state access issues.

Additionally, asyncio actors have built-in mechanisms for backpressure. Using the AsyncActor.options(max_concurrency=...) flag, you can limit how many coroutines will be running at once. By default, we limit 1000 coroutines running at once, but this can be higher if desired. This prevents the disastrous condition when you may accidentally enqueue millions of coroutines to the same event loop!

Asynchronous ray.get

Before the support for asyncio was implemented, ray.get and ray.wait are both blocking calls. This means that, even when you call ray.get inside an async method, the whole event loop will block and no other task will be executed. When using the async API ray.get to return a asyncio.Future object that can be integrated with the rest of the Python ecosystem tools.

Combining concurrent actor methods and async ray.get create more efficient ray code. For example, a service load balancer can be now implemented in a few lines of code:

In the load balancer actor, many instances of proxy_request method will be executed concurrently.

Another example is to offload compute heavy task inside the event loop off to a ray worker.

Compute heavy tasks are typically offloaded to a Python thread. However, due to the global interpreter lock, threading still effects performance of current Python process due to the shared global interpreter state.

Appendix: How is it implemented?

Ray uses C++ for worker to worker communication. Each task is enqueued in C++ side and executed in python interpreter through Cython bindings. This approach has a lot of performance benefits, however, it is not straightforward to modify task execution to be asynchronous.

We use boost::fiber library to suspend the execution of each calls after submitting python coroutines to a dedicated thread. After the completion of each coroutines, the python future callback executes and notifies the fiber to be resumable.

The asynchronous get feature uses Ray’s in memory object store and plasma object store interface. We create asyncio.Future objects and complete them on the completion of object fetching.

If you have any questions or thoughts about Ray or want to learn more about parallel and distributed Python, please join our community through Discourse or Slack. If you would like to know how Ray is being used in industry, consider attending Ray Summit.

--

--