Full code example at https://github.com/gterzian/pipeline/blob/master/src/main.rs
Note: When I write “sharing” below, I mean it in plain English as in “making available”. In fact, the sharing is relying on Rust’s move semantics, and emphatically not on shared references.
Some “challenges” of a pipeline
The Go article further highlights an essential challenge that need to be addressed: stages not exiting when they should, resulting in resource leak.
The Go article focuses on the particulars of the Go flavor of channels(see for an overview of the difference, a previous article). So instead of building something that is as close to that example as possible, we’re going focus on an idiomatic Rust pipeline, which also faces the prospect of forever hanging on a stage, but for different reasons.
Our chosen concurrency “primitives”
Our pipeline in Rust will involve the use of some pretty “primitive” tools indeed, firstly something referred to as “threads”, as well as the most boring channel implementation out there, the standard lib
For more specific needs, one could look further to crates such as
You are only as good as your idioms
How can we write the most ‘natural’ form of concurrent pipelines in Rust? What can we define as ‘idiomatic’, and who are we to decide what ‘idiomatic’ is? Well, I’ll settle for the following:
Idiomatic code implicitly uses standard features of the language and/or the standard library(or a crate that is so widely used to be considered standard as well). It’s “clever”, but only in relying implicitly on well-known features, as opposed to custom built contraptions. In the case of custom built contraptions, it will instead be as “dumb” and explicit as possible.
I’m going to take a big reputational risk here in trying to list a few well-know features in Rust:
itermethod, and especially it’s implementation of the
Can we implement a non-blocking, and gracefully shutting-down, concurrent pipeline relying only on these concepts? Could such an approach to pipelines be considered ‘idiomatic’ because it relies on standard language(or rather library provided) features? Let’s see…
A forever-hanging pipeline in Rust
Let’s start with an implementation that will indeed forever hang, and then fix it.
First an introduction to our components:
The pledge: create and share channels
The setup of the pipeline is represented by the following steps:
The main thread creates, and shares, the following channels:
- Results chan. Sender shared with the final “merge” stage, to send final results on back to the main thread.
- Merge chan. Sender shares with the “square” workers, to send squared numbers to the “merge” stage.
- Gen chan. Sender shared with the “generate” stage, to send generated numbers on.
- “Square workers”, simply a list of senders, through which we can cycle to distribute work among them.
The turn: start communicating
Communication starts by:
- Receive generated numbers via the “gen chan”,
- distribute the work by cycling over the “square_workers”.
- Start receiving over the “results chan”, waiting for the final results from the “merge” stage.
- In the background, each worker has a sender to the “merge” stage, hence each number that is squared is send to be “merged” via that channel, bypassing the main thread(see the “square” component code snippet above).
This looks great, only problem is we’re stuck in a bit of an ‘endless summer’ here, since this pipeline will keep running forever…
How to “Drop” our channels
How can we stop our forever iterating pipeline? For that, we need to understand when an iterator over a channel will return
A note on iteration in Rust and channels: calling
nexton an iterator that doesn’t have any values, will return
None, and will also cause a
forloop over it to stop.
In the case of a channel,
Nonewill be returned only when all the
Senderof the corresponding
Receiverhave been dropped.
So the answer is: to stop one stage in our pipeline we need to
Drop all the senders corresponding to the receiver of that stage.
So you see, our pipeline will naturally shut down as each stage is done iterating over it’s receiver, it will drop the senders for the next stage, resulting in the next stage stopping it’s iteration, and so on…
Oops, actually, that doesn’t happen. Our queue of ‘worker senders’ stays in scope in the main thread, even after the ‘fan-out’ part is done, hence the workers will keep iterating on their receivers and never shut-down...
The prestige: scopes
Let’s fix the above problem, and introduce an extra scope, which will result in the queue of the “worker sender” to drop.
Careful printing reveals that indeed, with all their senders having dropped here, when ‘square_workers” goes out of scope, the workers indeed stop iterating.
However, it is now the
merge that seems to keep iterating…
How can that be, since when the workers have stopped their iteration, and the sender to
merge that they hold goes out of scope, and
merge itself should stop iterating, shouldn’t it?
The problem is that we’ve forgotten that the two workers each own a ‘clone’ of the merge sender, and the original one is still in the ‘main’ scope, hence it does not drop, and
merge is stuck iterating forever.
The fix? simply replace one of the clones with an actual “move” of the
merge_chan, like this:
merge_chan, is dropped,
merge stops iterating, resulting in the
results_chan sender dropping, resulting in our main thread receiving a
None, and our entire pipeline gracefully shutting down.
So this was quite a ride, fortunately, the answer to “how to implement a pipeline in Rust?” seems to be, “just iterate over receivers and make sure to drop senders when you’re done with them”. Basically, as an upstream stage drops the sender(s) to downstream one, this will cause the downstream one to stop iterating on it’s receiver, and allow it to drop senders for the next stage, resulting in graceful shutdown.
Off-course that doesn’t cover the case of a downstream stage shutting down before having received all values sent by an upstream one? But do we care about that? As long we ensure senders are dropped and the entire pipeline shuts-down, whatever data was left on the buffer of our unbounded channels will be dropped too…
And if you really need a downstream stage to be able to signal an upstream one to stop producing values, could you implement such an extra communication line by “sharing your sender(s)”?
To be continued…