Rust concurrency patterns: No context, no cancel, no leaks
At the end of our previous article I wrote:
if you really need a downstream stage to be able to signal an upstream one to stop producing values, could you implement such an extra communication line by “sharing your sender(s)”?
Now it’s true that the example in that article took a bit of a shortcut, by passing a list of numbers to the “generator”, and having the generator iterate over that list and then shut down.
Edit: I’ve actually since update that previous article to match what is done in this one…
Let’s face it, if you know exactly what you want generated and can actually pass it along like that, you probably don’t need to have those things generated in the first place…
So let’s put ourselves to the challenge of having an ‘infinite’ generator, which will have to be told to stop generating by the consumer…
Full code example available at https://github.com/gterzian/pipeline/blob/77ca6459a5a98649e37d8ba8a58863a5f675aa56/src/main.rs
On the simplicity of Rust
Last week, I also came across an article entitled “Using contexts to avoid leaking goroutines”, which, as you’ve guessed it, describes how in Go you can use a “context” to signal a producer in a “goroutine” to stop producing values.
This is obviously relevant for our topic today, since we’re going to do exactly the same thing using Rust.
Also, I get the feeling that Go has the reputation of being more simple than Rust, as if using Rust gives you benefits but also comes at a huge cost in terms of learning curve and complexity.
I personally beg to differ.
I think Rust, with language-wide traits like Iterator
or Drop
, and with the consistency of semantics like move
, actually allows for very simple yet expressive solution to many problems, which rely implicitly on well-known language features. In some ways, it reminds me of __iter__
and the likes in Python...
The good news is that, I think we can try to outdo the above linked example in simplicity with the Rust equivalent, because we won’t be needing a “context”, we won’t have to explicitly close channels, and we won’t be facing any potential resource leaks. All thanks to well-known features of Rust, highlighting it’s simplicity.
Let’s see how it can be done…
Since we’re using an unbounded channel, the buffer of that channel could grow without bounds if the producer is much faster in producing than the consumer is in consuming values from the channel. However, this isn’t quite a ‘resource leak’, since when the receiver is dropped, the producer will certainly stop producing, and any “left-overs” in the internal buffer will be dropped as well. It could however be a problem if the generation is very expensive in itself, and see the “note from a reader on reddit” below for a bit more background on that.
Telling a producer to “just drop it”.
So, here we go.
The whole things comes down to a while loop. Basically, the generator will keep generating numbers, and sending them on a channel, until the “send” operation returns an Err
.
Rust while let Ok(_) = sender.send(num)
really shines here, since the while loop doesn’t need to include some manual boolean logic, instead you simply deconstruct the Result
returned by the send operation. If the result is an Err
, the while loop will stop.
So when is an Err
going to be returned? From the docs, we are told that “An unsuccessful send would be one where the corresponding receiver has already been deallocated”.
That means that to stop the while loop of the producer, we need to drop the receiver on which the generator is sending values.
That’s quite easy, just iterate over the receiver until you receive the final value you are looking for, and then stop the iteration. Scoping the iteration will ensure that the receiver is drop once you break out of it(and while our example requires manual scoping things a lot is happening in the main thread, in a real world use-case this scoping is likely to flow naturally from the code structure, for example if your consumer is in it’s own thread, or just in a separate function).
That’s it folks, I can’t write more about this…
A note from a reader on reddit: “I think it’s important to emphasize that it’s tricky to avoid all extra value generation. You could use a
sync_channel
with bound 0 to get a rendezvous, but you must then wait for the receiver to "approve" the received value somehow before generating the next one. The easiest way to do that would be with a synchronous back channel.Futures would be another way to avoid all extra generation in this example.
No matter how you do this, you’re going to lose all your parallelism: the whole point of generate-and-test parallelism is for the generator to be generating while the tester is testing.
None of this is unique to Rust, of course: any generate-and-test architecture has to make the choice of how much speculative generation is done. No speculation = no parallelism, and too much speculation can be an expensive waste. Efficient parallelism is hard.”
A bonus: Type your messages with Enums.
Since we got our initial problem out of the way so quickly, let’s add a little bonus. Using channels to communicate really starts shining when you add some meaning to your messages.
Previously, our senders were typed as Sender<u8>
That doesn’t tell us that much about what kind of messages will flow from one stage to the other. So let’s introduce a PipelineMsg
enum, which will give more meaning to our messaging.
Our enum has three variants: Generated(u8)
, Squared(u8)
, and Merged(u8)
. This already tells us much more about how data flows from one stage to the other.
Our various stages, like “square” above, can now also assert that they are getting the messages they expect. In general, your code has just gained quite a bit in readability...