Closeup of a valve and pressure gauge from a 1940s locomotive
Photo by Fred Reiss

An Easy Way to Add Flow Control to Ray Applications

Fred Reiss
IBM Data Science in Practice

--

One of the coolest features of the Ray framework for distributed computing is that it makes it easy to write high-performance applications that interact with the outside world. In this blog post, we show how you can use Ray’s own APIs to manage these interactions for more predictable end-to-end performance.

Invoking Remote Services from Ray

Ray is an open source framework for parallel computing that uses task-based parallelism. A Ray application can launch many independent tasks, each of which operates independently of the rest. Tasks are particularly useful for interacting with systems that are outside your Ray application. For example:

  • In Reinforcement Learning (RL) applications, Ray tasks can trigger actions in the real world, such as updating the list of suggested products on your web site.
  • In Extract-Transform-Load (ETL) applications, Ray tasks can send small batches of updated records to a remote database.
  • In AI model inference applications, Ray tasks can send requests to models that run behind web service APIs; or they can send audit information to a remote service that tracks model drift.

This greater flexibility for interacting with the outside world comes with a potential pitfall: A Ray application can issue a very large number of remote requests in a very short amount of time. Ray was designed from the beginning to support millions of tasks per second. If each of those tasks talks to a remote service, they can cause some serious unintended consequences. The requests could interfere with other users of the remote service. The service’s security software could interpret your network traffic as an attack and block your application. Or a misbehaving Ray application could lead to a surprise cloud bill because your cloud provider charges you for each service call.

Fortunately, you can mitigate all of these risks by adding flow control to your Ray application. Flow control is a computer networking term that basically means, “ensuring that the messages you send do not overwhelm the systems you are sending them through.” Applying flow control to your application’s interactions with a remote service means controlling things like the number of requests per second, the amount of data in flight at any time, or the amount of CPU load that you are putting on the remote service.

In this article, we’ll show you how to implement application-level flow control from inside a Ray application, using Ray’s own parallel processing APIs.

Tracking State with Ray Actors

In addition to tasks, Ray’s parallelism model also includes the concept of actors. A Ray actor is a Python or Java object that Ray tasks can interact with. Actors are useful for storing the state that your application needs to share between multiple tasks. For example, you can use an actor to hold the current set of weights for a model that your application is training, or the actor could hold the state of a simulator that your model is interacting with for reinforcement learning.

In addition to things like models and data, an actor’s state can also store control information about the application itself. That’s what we’re going to do here: use an actor to store information about how quickly the application is currently hitting a remote service.

All of this state-tracking functionality can go into a base class that you can reuse across multiple applications. Here’s an example of such a base class, implemented in Python.

This class, FlowControlledActorBase, has four methods. Let's go over them briefly.

The constructor, __init__(), sets up the object fields that will track the flow control state. In this example, we use a simple form of rate-based flow control. The only information we need to store is the minimum time between requests, the timestamp of the most recent request, the number of pending requests, and a lock to allow multiple threads to read and write the first three numbers. If we wanted to, we could enable more sophisticated forms of flow control by tracking things like a moving average of request latency or the amount of data in flight. Use whatever approach is appropriate for your application.

The second method in the FlowControlledActorBase class is set_rate_limit(). This method lets the application adjust the maximum number of requests per second that the actor will allow.

The third method is the process() method, which is where the rate-limiting logic lives. When a Ray task enters the process() method, it checks whether enough time has elapsed since the most recent request. If it’s too soon to issue another request, the task calculates how long it should wait, waits that amount of time, and then issues the request. Multiple tasks can be inside process() at the same time, so we also include some locking to keep everything thread-safe.

The last part of the class is an abstract method, process_internal(). Subclasses of FlowControlledActorBase can override process_internal() with appropriate code to call the remote service.

Defining your Actor

Once you’ve defined a base class like FlowControlledActorBase, you can use it to create a Ray actor with flow control. Here is some example code that creates such an actor for talking to the IBM Watson Natural Language Understanding web service.

The above code creates a class FlowControlledActor that inherits flow control functionality from its parent class, the FlowControlledActorBase class we defined earlier. The FlowControlledActor class implements the process_internal() method from its parent class. This process_internal() method opens a connection to Watson Natural Language Understanding, sends a request, and returns the service's response.

Note the@ray.remote decorator immediately before the declaration of FlowControlledActor. This decorator tells Ray to turn this class into a factory object for creating Ray actors.

A Note about Threads

It’s important to note that the Python client library for Watson Natural Language Understanding is thread-safe and non-blocking. Multiple Python threads can be inside the Natural Language Understanding analyze() method at the same time without interfering with each other. If the client for your remote service is not designed for threads, you will want to use a Ray task to isolate each service call in its own process. Here’s what the code above looks like if we run each call to Watson Natural Language Understanding in a separate Ray task.

Using a Flow-Controlled Actor

Regardless of whether you use a thread-safe client or a pool of tasks to talk to the remote service, the code to use actor looks the same. Here is some Python code that creates a FlowControlledActor actor and routes a single request through it.

Let’s walk through this code one line at a time.

Line 5 of the listing above creates a Ray actor. Remember that the @ray.remote decorator on the FlowControlledActor class turned that class into a factory for Ray actors. Ray's factory objects use method chaining to pass parameters. The code in this line tells Ray to create an actor and to set its max_concurrency parameter to 10. max_concurrency is a Ray parameter that controls how many Ray tasks can use the actor at the same time. If more than this number of tasks try to use the actor, Ray will queue the excess tasks until the actor becomes available.

Line 6 calls the set_rate_limit() method we defined earlier in the FlowControlledActorBase base class. This line uses ray.get() to block the calling process until the actor has put the new policy into place.

Taken together, these two lines of code apply two kinds of flow control to the application’s requests:

  • No more than 5 requests per second.
  • No more than 10 simultaneous requests.

The code on line 11 uses nlu_client_actor to send one request to Watson Natural Language Understanding. The return value from this line is a Ray object reference that lets us access the task’s return value once the task has completed.

Line 14 uses this object reference and the ray.get() function to block until the task has completed and retrieve the results from the Watson service.

Flow Control in Action

Let’s show how the code we’ve shown so far works with a simple microbenchmark. This microbenchmark produces a “burst” of 10 large HTML documents every 2 seconds. It sends the documents to Watson Natural Language Understanding for processing, using the flow control actor to limit the request rate to 5 requests per second.

The plot below shows a timeline of the first seven seconds of this microbenchmark. Each horizontal bar represents the processing for a single document. The part of the bar in blue shows time that the document spent queued due to flow control. The orange part shows the time that the document spent being processed by Watson Natural Language Understanding.

Microbenchmark showing the effects of flow control in a Ray application that communicates with the external Watson Natural Language Understanding service. Each horizontal bar represents the timeline of processing for a single large HTML document. The microbenchmark generates bursts of documents every two seconds (beginning of blue part of each timeline), but the Ray application’s flow control turns this bursty load into a steady, predictable stream of requests to the service (orange parts).

You can see how the flow control smooths out each “burst” of ten documents into a steady, predictable stream of requests.

We hope that you’ve enjoyed this introduction to using Ray Actors to add flow control to Ray applications. A few lines of additional code can give more predictable performance, not only for your application, but for all of the other systems that it interacts with.

If you’d like to try out Ray at scale, take at look at this post on how to use Ray with IBM Cloud Code Engine.

--

--

Fred Reiss
IBM Data Science in Practice

Fred Reiss is a Principal Research Staff Member at IBM Research and Chief Architect at IBM’s Center for Open-Source Data and AI Technologies (CODAIT).