Rust concurrency patterns: Natural Born Pipelines
There seems to be demand for a “Rust concurrent pipeline” guide à la https://blog.golang.org/pipelines, so let’s give it a try.
Full code example at https://github.com/gterzian/pipeline/blob/master/src/main.rs
Note: When I write “sharing” below, I mean it in plain English as in “making available”. In fact, the sharing is relying on Rust’s move semantics, and emphatically not on shared references.
Some “challenges” of a pipeline
The Go article further highlights an essential challenge that need to be addressed: stages not exiting when they should, resulting in resource leak.
The Go article focuses on the particulars of the Go flavor of channels(see for an overview of the difference, a previous article). So instead of building something that is as close to that example as possible, we’re going focus on an idiomatic Rust pipeline, which also faces the prospect of forever hanging on a stage, but for different reasons.
Our chosen concurrency “primitives”
Our pipeline in Rust will involve the use of some pretty “primitive” tools indeed, firstly something referred to as “threads”, as well as the most boring channel implementation out there, the standard lib std::sync::mpsc
.
These tools appear good enough to build intricate patterns of concurrent work by sticking to “sharing the Sender
“ half of our channel in various ways…
For more specific needs, one could look further to crates such as crossbeam
.
You are only as good as your idioms
How can we write the most ‘natural’ form of concurrent pipelines in Rust? What can we define as ‘idiomatic’, and who are we to decide what ‘idiomatic’ is? Well, I’ll settle for the following:
Idiomatic code implicitly uses standard features of the language and/or the standard library(or a crate that is so widely used to be considered standard as well). It’s “clever”, but only in relying implicitly on well-known features, as opposed to custom built contraptions. In the case of custom built contraptions, it will instead be as “dumb” and explicit as possible.
I’m going to take a big reputational risk here in trying to list a few well-know features in Rust:
- the
Iterator
trait. for
loops.mpsc::Receiver
'siter
method, and especially it’s implementation of theIntoIterator
trait.- The
Drop
trait.
Can we implement a non-blocking, and gracefully shutting-down, concurrent pipeline relying only on these concepts? Could such an approach to pipelines be considered ‘idiomatic’ because it relies on standard language(or rather library provided) features? Let’s see…
A forever-hanging pipeline in Rust
Let’s start with an implementation that will indeed forever hang, and then fix it.
First an introduction to our components:
The pledge: create and share channels
The setup of the pipeline is represented by the following steps:
The main thread creates, and shares, the following channels:
- Results chan. Sender shared with the final “merge” stage, to send final results on back to the main thread.
- Merge chan. Sender shares with the “square” workers, to send squared numbers to the “merge” stage.
- Gen chan. Sender shared with the “generate” stage, to send generated numbers on.
- “Square workers”, simply a list of senders, through which we can cycle to distribute work among them.
The turn: start communicating
Communication starts by:
- Receive generated numbers via the “gen chan”,
- distribute the work by cycling over the “square_workers”.
Finally, we:
- Start receiving over the “results chan”, waiting for the final results from the “merge” stage.
- In the background, each worker has a sender to the “merge” stage, hence each number that is squared is send to be “merged” via that channel, bypassing the main thread(see the “square” component code snippet above).
This looks great, only problem is we’re stuck in a bit of an ‘endless summer’ here, since this pipeline will keep running forever…
How to “Drop” our channels
How can we stop our forever iterating pipeline? For that, we need to understand when an iterator over a channel will return None
.
A note on iteration in Rust and channels: calling
next
on an iterator that doesn’t have any values, will returnNone
, and will also cause afor
loop over it to stop.In the case of a channel,
None
will be returned only when all theSender
of the correspondingReceiver
have been dropped.
So the answer is: to stop one stage in our pipeline we need to Drop
all the senders corresponding to the receiver of that stage.
So you see, our pipeline will naturally shut down as each stage is done iterating over it’s receiver, it will drop the senders for the next stage, resulting in the next stage stopping it’s iteration, and so on…
Oops, actually, that doesn’t happen. Our queue of ‘worker senders’ stays in scope in the main thread, even after the ‘fan-out’ part is done, hence the workers will keep iterating on their receivers and never shut-down...
The prestige: scopes
Let’s fix the above problem, and introduce an extra scope, which will result in the queue of the “worker sender” to drop.
Careful printing reveals that indeed, with all their senders having dropped here, when ‘square_workers” goes out of scope, the workers indeed stop iterating.
However, it is now the merge
that seems to keep iterating…
How can that be, since when the workers have stopped their iteration, and the sender to merge
that they hold goes out of scope, and merge
itself should stop iterating, shouldn’t it?
Moving on…
The problem is that we’ve forgotten that the two workers each own a ‘clone’ of the merge sender, and the original one is still in the ‘main’ scope, hence it does not drop, and merge
is stuck iterating forever.
The fix? simply replace one of the clones with an actual “move” of the merge_chan
, like this:
Now, when merge_chan
, is dropped, merge
stops iterating, resulting in the results_chan
sender dropping, resulting in our main thread receiving a None
, and our entire pipeline gracefully shutting down.
The End
So this was quite a ride, fortunately, the answer to “how to implement a pipeline in Rust?” seems to be, “just iterate over receivers and make sure to drop senders when you’re done with them”. Basically, as an upstream stage drops the sender(s) to downstream one, this will cause the downstream one to stop iterating on it’s receiver, and allow it to drop senders for the next stage, resulting in graceful shutdown.
Off-course that doesn’t cover the case of a downstream stage shutting down before having received all values sent by an upstream one? But do we care about that? As long we ensure senders are dropped and the entire pipeline shuts-down, whatever data was left on the buffer of our unbounded channels will be dropped too…
And if you really need a downstream stage to be able to signal an upstream one to stop producing values, could you implement such an extra communication line by “sharing your sender(s)”?
To be continued…