Rust concurrency patterns: Natural Born Pipelines

There seems to be demand for a “Rust concurrent pipeline” guide à la https://blog.golang.org/pipelines, so let’s give it a try.

Full code example at https://github.com/gterzian/pipeline/blob/master/src/main.rs
Note: When I write “sharing” below, I mean it in plain English as in “making available”. In fact, the sharing is relying on Rust’s move semantics, and emphatically not on shared references.

A generic pipeline

Let’s take the Go example linked to above as a starting point(at least the simple version in that article), which involves the following:

  1. generate numbers.
  2. square them, using several workers.
  3. merge the results from the various workers.

Some “challenges” of a pipeline

The Go article further highlights an essential challenge that need to be addressed: stages not exiting when they should, resulting in resource leak.

The Go article focuses on the particulars of the Go flavor of channels(see for an overview of the difference, a previous article). So instead of building something that is as close to that example as possible, we’re going focus on an idiomatic Rust pipeline, which also faces the prospect of forever hanging on a stage, but for different reasons.


Our chosen concurrency “primitives”

Our pipeline in Rust will involve the use of some pretty “primitive” tools indeed, firstly something referred to as “threads”, as well as the most boring channel implementation out there, the standard lib std::sync::mpsc.

These tools appear good enough to build intricate patterns of concurrent work by sticking to “sharing the Sender“ half of our channel in various ways…

For more specific needs, one could look further to crates such as crossbeam.


You are only as good as your idioms

How can we write the most ‘natural’ form of concurrent pipelines in Rust? What can we define as ‘idiomatic’, and who are we to decide what ‘idiomatic’ is? Well, I’ll settle for the following:

Idiomatic code implicitly uses standard features of the language and/or the standard library(or a crate that is so widely used to be considered standard as well). It’s “clever”, but only in relying implicitly on well-known features, as opposed to custom built contraptions. In the case of custom built contraptions, it will instead be as “dumb” and explicit as possible.

I’m going to take a big reputational risk here in trying to list a few well-know features in Rust:

  1. the Iterator trait.
  2. for loops.
  3. mpsc::Receiver's iter method, and especially it’s implementation of the IntoIterator trait.
  4. The Drop trait.

Can we implement a non-blocking, and gracefully shutting-down, concurrent pipeline relying only on these concepts? Could such an approach to pipelines be considered ‘idiomatic’ because it relies on standard language(or rather library provided) features? Let’s see…


A forever-hanging pipeline in Rust

Let’s start with an implementation that will indeed forever hang, and then fix it.

First an introduction to our components:

Our “generate” stage. Holds a sender to send generated numbers to the main thread.
“Square”, returns a sender on which work can be sent, and reiceive a “merge_chan”, on which to send intermediary results to the “merge” stage.
Our “merge” stage, is shared a “merge_result_chan”, to send final results to the main thread, and itself shares a sender with the outside world to receiver squared numbers on.

The pledge: create and share channels

The setup of the pipeline is represented by the following steps:

The main thread creates, and shares, the following channels:

  1. Results chan. Sender shared with the final “merge” stage, to send final results on back to the main thread.
  2. Merge chan. Sender shares with the “square” workers, to send squared numbers to the “merge” stage.
  3. Gen chan. Sender shared with the “generate” stage, to send generated numbers on.
  4. “Square workers”, simply a list of senders, through which we can cycle to distribute work among them.

The turn: start communicating

Communication starts by:

  1. Receive generated numbers via the “gen chan”,
  2. distribute the work by cycling over the “square_workers”.

Finally, we:

  1. Start receiving over the “results chan”, waiting for the final results from the “merge” stage.
  2. In the background, each worker has a sender to the “merge” stage, hence each number that is squared is send to be “merged” via that channel, bypassing the main thread(see the “square” component code snippet above).

This looks great, only problem is we’re stuck in a bit of an ‘endless summer’ here, since this pipeline will keep running forever…


How to “Drop” our channels

How can we stop our forever iterating pipeline? For that, we need to understand when an iterator over a channel will return None.

A note on iteration in Rust and channels: calling next on an iterator that doesn’t have any values, will return None, and will also cause a for loop over it to stop.
In the case of a channel, None will be returned only when all the Sender of the corresponding Receiver have been dropped.

So the answer is: to stop one stage in our pipeline we need to Drop all the senders corresponding to the receiver of that stage.

So you see, our pipeline will naturally shut down as each stage is done iterating over it’s receiver, it will drop the senders for the next stage, resulting in the next stage stopping it’s iteration, and so on…

Oops, actually, that doesn’t happen. Our queue of ‘worker senders’ stays in scope in the main thread, even after the ‘fan-out’ part is done, hence the workers will keep iterating on their receivers and never shut-down...


The prestige: scopes

Let’s fix the above problem, and introduce an extra scope, which will result in the queue of the “worker sender” to drop.

Careful printing reveals that indeed, with all their senders having dropped here, when ‘square_workers” goes out of scope, the workers indeed stop iterating.

However, it is now the merge that seems to keep iterating…

How can that be, since when the workers have stopped their iteration, and the sender to merge that they hold goes out of scope, and merge itself should stop iterating, shouldn’t it?


Moving on…

The problem is that we’ve forgotten that the two workers each own a ‘clone’ of the merge sender, and the original one is still in the ‘main’ scope, hence it does not drop, and merge is stuck iterating forever.

The fix? simply replace one of the clones with an actual “move” of the merge_chan, like this:

Now, when merge_chan, is dropped, merge stops iterating, resulting in the results_chan sender dropping, resulting in our main thread receiving a None, and our entire pipeline gracefully shutting down.


The End

So this was quite a ride, fortunately, the answer to “how to implement a pipeline in Rust?” seems to be, “just iterate over receivers and make sure to drop senders when you’re done with them”. Basically, as an upstream stage drops the sender(s) to downstream one, this will cause the downstream one to stop iterating on it’s receiver, and allow it to drop senders for the next stage, resulting in graceful shutdown.

Off-course that doesn’t cover the case of a downstream stage shutting down before having received all values sent by an upstream one? But do we care about that? As long we ensure senders are dropped and the entire pipeline shuts-down, whatever data was left on the buffer of our unbounded channels will be dropped too…


And if you really need a downstream stage to be able to signal an upstream one to stop producing values, could you implement such an extra communication line by “sharing your sender(s)”?

To be continued…