Ray Tips and Tricks, Part I — ray.wait

Dean Wampler
Jan 28 · 5 min read

This series of posts provides an expanded update for a RISELab post last year on tips and tricks for using Ray effectively.

Note: Ray Summit 2020 is postponed to Fall. Visit anyscale.com/events to learn about Ray Summit Connect, our Summer series of online events, and plans for Ray Summit 2020 this Fall. You can also sign up for updates.

First, here’s a recap the core Ray API commands:

  • ray.init() — Initialize the Ray context. (doc)
  • @ray.remote — Annotate a Python function to make it a task or class to make it an actor that runs in a different process. (doc)
  • foo.remote(...) — Invoke the remote task foo. This is an asynchronous (nonblocking) operation. A future is returned immediately. The asynchronous execution of the function will return some value that can be retrieved later using the future, once it’s available. (doc)
  • Class.remote(...) — Invoke the actor constructor method and return an actor object. This is an asynchronous operation that returns the actor reference, but construction is performed asynchronously in the target process where the actor will live. (doc)
  • actor.method.remote(...) — Invoke the actor method method. This is also an asynchronous operation that returns a reference to a future. (doc)
  • ray.put(thing) — Put thing in the distributed object store and return its ID. This ID can be used to pass object as an argument to any remote function or method call. This is a synchronous (blocking) operation. (doc)
  • ray.get(ids) — Get one more objects for the ID or IDs specified from the distributed object store. This is a synchronous (blocking) operation. (doc)
  • ray.wait(ids) — From a list of object IDs returns (1) the list of IDs of the objects that are ready, and (2) the list of IDs of the objects that are not ready yet. By default it returns one ready object ID at a time. (doc)
  • ray.shutdown() — Disconnect the worker, and terminate processes started by ray.init(). It will automatically run when a Python process that uses Ray exits. It will clear any remote function definitions, actor definitions, and existing actors. (doc)

There are other methods for more advanced needs. The whole ray package is documented here. For a more detailed walkthrough, see here.

ray.wait()

The ray.wait method is useful, but it can be confusing to use until you understand how it works. It’s handy when you have many concurrent tasks running, perhaps independent workers, and you want to process the results as they become available.

Let’s walk through an example to better understand how it works and how to use it.

First, the imports and calling ray.init(), then define a “slow” function:

Imports, start Ray, define a “busy” function

Now lets start five tasks using busy:

Create five “busy” tasks

Here’s the output:

(5, ObjectID(54158c91583effffffff0100000000c001000000))
(10, ObjectID(b58a281983baffffffff0100000000c001000000))
(15, ObjectID(d8bf913e0fa6ffffffff0100000000c001000000))
(20, ObjectID(73962dd385f7ffffffff0100000000c001000000))
(25, ObjectID(26acd2812046ffffffff0100000000c001000000))

I printed the tuples to remind you that calling busy.remote(i) returns an object id immediately for a future. Of course these numbers change with each run. The first numbers in the tuples are the seconds each task will sleep.

Now let’s call ray.wait() using the default values for the optional arguments, 1 for num_returns and no timeout value.

Wait for until one task to finish on each iteration, loop until done.

Here’s the output:

iteration: 1
Ready length, values: 1 [5]
Not Ready length: 4
iteration: 2
Ready length, values: 1 [10]
Not Ready length: 3
iteration: 3
Ready length, values: 1 [15]
Not Ready length: 2
iteration: 4
Ready length, values: 1 [20]
Not Ready length: 1
iteration: 5
Ready length, values: 1 [25]
Not Ready length: 0

It takes about 5 seconds for each iteration, as ray.wait has to wait for something to finish. With the default argument values,ray.wait() waits until one item is ready, as long as it takes. In the two lists returned, the ready list has the single item and the not_ready list has the rest. (We’ll see shortly that this second variable is misnamed.) The order of items returned should make sense, given their sleep times.

Also, at the end of each iteration, I reset the ids list to be the not_ready list, for reasons that will become clear below.

Now, notice what happens when I ask for two at a time, num_returns = 2:

Returning two at a time.

Hilarity ensues:

It works nicely until we ask for two when only one remains. A future release of Ray may simply return up to num_returns and not throw an exception (https://github.com/ray-project/ray/issues/6667).

But for now, here is the same example withnum_returns adjusted as required:

Ensure the “num_returns” is ≤ size of the ids list.
iteration: 1
Ready length, values: 2 [5, 10]
Not Ready length: 3
iteration: 2
Ready length, values: 2 [15, 20]
Not Ready length: 1
iteration: 3
Ready length, values: 1 [25]
Not Ready length: 0

Okay, what happens if we add a timeout value? I’ll use 2.5 seconds, half the 5 second delta between each task:

Now with timeout!

Tip: The timeout value should be a float or you’ll get a warning.

Can you understand this output?

iteration: 1
Ready length, values: 0 []
Not Ready length: 5
iteration: 2
Ready length, values: 1 [5]
Not Ready length: 4
iteration: 3
Ready length, values: 0 []
Not Ready length: 4
iteration: 4
Ready length, values: 1 [10]
Not Ready length: 3
iteration: 5
Ready length, values: 0 []
Not Ready length: 3
iteration: 6
Ready length, values: 1 [15]
Not Ready length: 2
iteration: 7
Ready length, values: 0 []
Not Ready length: 2
iteration: 8
Ready length, values: 1 [20]
Not Ready length: 1
iteration: 9
Ready length, values: 0 []
Not Ready length: 1
iteration: 10
Ready length, values: 1 [25]
Not Ready length: 0

Because we reset the passed-in ids, either the “fastest” one has 5 seconds to go, in which a zero-length ready list is returned when we hit the timeout, or we just did one of those iterations and now the fastest has 2.5 seconds to go, so we’ll get one that finishes!

Tip: Notice the interpretation of num_returns with no timeout; it means we’ll wait as long as it takes for that many tasks to finish. However, with a timeout, then num_returns or fewer items will be returned.

Try running this last example again with a timeout longer than 2.5 seconds.

What if we pass 0 to all calls to busy.remote(), so that all finish immediately?

All tasks actually finish immediately.
iteration: 1
Ready length, values: 2 [0, 0]
Not Ready length: 3
iteration: 2
Ready length, values: 2 [0, 0]
Not Ready length: 1
iteration: 3
Ready length, values: 1 [0]
Not Ready length: 0

All of them are actually ready immediately, but we only asked for two, so ready contains two (except for the last iteration where we ask for the one remaining…) and not_ready is misnamed, because its items are actually “ready”, as well. Unlike the previous runs, these iterations run instantaneously, since no waiting is required.

Finally, I pointed out previously that we set ids to not_ready on each iteration. Here’s why; comment out the line ids = not_ready in the previous example and try again.

Here’s the (truncated) output:

iteration: 1
Ready length, values: 2 [0, 0]
Not Ready length: 3
iteration: 2
Ready length, values: 2 [0, 0]
Not Ready length: 3
...
iteration: 38
Ready length, values: 2 [0, 0]
Not Ready length: 3
iteration: 39
Ready length, values: 2 [0, 0]
Not Ready length: 3

Because we pass in the same list of ids, it keeps returning the same two items as ready (the first two in the list; order is preserved) and the rest are returned in the not_ready list, until we hit the end of the loop counter, 39.

Random Factoids and Ancient Mysteries Revealed!

  1. If ray.init() times out with a connection error, even on a local setup, maybe it’s because you are running a VPN on your machine.

For Next Time

We’ll explore various performance tips we’ve learned.

Distributed Computing with Ray

Ray is a fast and simple framework for distributed…

Dean Wampler

Written by

The person who’s wrong on the Internet. ML/AI with Ray.io at Anyscale.com, data theologist, FP supplicant, speaker, O’Reilly author.

Distributed Computing with Ray

Ray is a fast and simple framework for distributed computing

More From Medium

More from Distributed Computing with Ray

More on Rays from Distributed Computing with Ray

More on Rays from Distributed Computing with Ray

Ray Tips and Tricks, Part 2 — ray.get()

More on Rays from Distributed Computing with Ray

More on Rays from Distributed Computing with Ray

Ray Summit 2020 Postponed

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade