Rust concurrency patterns: communicate by sharing your Sender
Doing concurrency in ‘share by communicating’ style has been popularized by the Go community. It’s a valuable approach to concurrency in Rust too, however, one has to be aware of the different semantics of Rust vs Go channels when doing so.
Both ends of a Rust channel can only be ‘owned’ by one thread at the time, however the “Sender” half can be “cloned”, and through such cloning the conceptual “sender” part of a channel can be shared among threads, which is how you do the “multi-producer, single-consumer” part…
Go channels seem to consist of a single value(or is it a reference?), and to behave like a “multi-producer, multi-consumer queue” that can be shared among “goroutines”.
By default, a Go channel has a 0 buffer size, meaning that any sending on it will block until the other end has received the message.
In the Rust standard library, we can choose between using the default channel, that comes with a Sender with an unlimited buffer and hence will never block sending, and the sync_channel that comes with a SyncSender that will block sending if the buffer is full.
In both languages, channels can be passed around inside messages sent on them. So one can send a message on a channel containing a sender, and then immediately start waiting for an answer(which is like making a “cross-thread procedure call” and waiting for the return value).
Communicate by sharing (clones of) your Sender, and keep the Receiver to yourself.
However, some ‘examples’ are still in order…
Note: when I write “sharing”, I mean “making available”, not sharing via a shared reference. In fact the whole “sharing” described below relies on move semantics, not shared references.
Rust mpsc channels are a library, not a language feature
One thing worth highlighting is that the current mpsc channel in Rust, are part of the standard library, not the language.
The drawback, and the benefit, is: things can change more easily than if it was part of the language like in Go.
See for an example of how implementations and API’s of channels are debated and iterated on in the ecosystem: https://github.com/stjepang/rfcs-crossbeam/blob/channel/text/2017-11-09-channel.md
Implementing concurrent workers in Rust
The idea is that you’re going to distribute some work among a set of “workers”(fan-out), and aggregate the results back from such workers(fan-in).
Let’s see how both can be implemented using Rust channels.
Full code example available here.
For some excellent visuals and more info on these patterns(with examples in Go), see http://divan.github.io/posts/go_concurrency_visualize/
Let’s say our main thread needs to receive messages from a ‘producer’, then distribute the work received from the producer among a pool of workers, aggregate results in some way via a single consumer, and finally get those final results back to the main thread.
That gives us a couple of concurrent components:
- The producer, who will ‘produce’ something to be sent to ->
- The main thread, who will distributed this work to be done by ->
- A pool of workers/executors, who will do the work and communicate results to the ->
- Consumer, who will keep track of work done in the pool and ->
- Signal back to the main thread when all is done.
Let’s start the flow:
- create, and share(or rather move) channels
2. Start the flow: receive work, or results
If you can only have a single receiver, how can you distribute work among workers using a channel in Rust? Should you find a way to share the receiver among threads after all?
A solution to this problem is simply to turn the design on it’s head, and have each worker provide a “sender” for the producer to send work on. Each worker creates a new channel, and shares the Sender half.
Your worker pool ends up being represented by a queue of senders, and work can be distributed among the workers for example by taking one from the front, sending a message, and putting it at the back of the queue.
Incidentally, such ‘single producer, single consumer’ use of an mpsc channel appears to one of the most optimized way to send messages from one thread to another, in the current Rust ecosystem.
Let’s say we want to aggregate the work from the various workers and have it all go through a single consumer.
Fan-in becomes quite easy when you share senders around and keep a single receiver owned by a single thread. At that point, it’s really just a matter of iterating on the receiver and handling the incoming messages.
A few other things
Does the ‘consumer’ need to communicate with the main, or another, thread? Again, just have that thread share it’s sender(or perhaps a clone thereof, in case you have multiple consumers). The consumer can then simply send messages to signal things to whatever component shared the sender.
Do you need to ‘close’ all your workers at some point? Easy, since your only reference to a worker is through the sender they’ve shared with you, just send them each a message telling them to shut down!
Really quite a lot of patterns can be expressed by sharing one, or several clones of, a Sender.
If you feel that sending “Quit” message to the workers and the producer is too much, see the follow-up article for an example of a pipeline where each downstream stage shuts-down in response to an upstream one shutting down:
A note on looping inside a thread
As a side note, the code also contains examples of a few “non-event-loops”. Basically, loops that can combine receiving messages from other threads, with performing some “thread-local work”.
“Non-event-loop”, because it’s not doing any async I/O, which is what is often meant with an “event-loop”. Just trying to highlight that a a loop combined with messaging can be a nice way to structure the execution of your program, even when it doesn’t have anything to do with async I/O.
The “event-loop” in our case is not “a place to handle events from the system in the context of async I/O”, rather the event-loop is a place for a concurrent component to execute things sequentially in a thread-local way, while perhaps letting the outside world in via receiving and handling of messages.
Essentially, the “event-loop” is the “local” processing model of a concurrent component(and it’s some ways the direct result of message based concurrency, since you tend to loop in order to receive on a channel…)
For example, the executor’s “run loop”.
- It all starts with a “while” loop, that will repeatedly call “run” until it returns false.
Let’s take a look at what this
WorkflowExecutor actually looks like from the inside:
- The actual “run” function consists of:
- First, try to receive a message. The result of this can include things like receiving work, or being told to quit.
- How do you quit? By returning ‘false’, which will end the while loop.
- How do you handle work? In this case, you just add it to a local queue of work. Since this queue is not shared among threads, it doesn’t require any locking.
- Secondly, once you’re done receiving a message, if you haven’t quit yet, then do one ‘step’ of work. In this case, pop a piece of work from the front of your queue and do something with it. This might also involve sending messages with results from this work.
- The great benefit is that, while you are dealing with potentially complex concurrent logic, what you are looking at is quite simply a loop, with a predictable sequential type of logic at each iteration that is quite easy to reason about.
Here is another one, the consumer’s loop.
Remember this is happening at the ‘fan-in’ stage, and the consumer is receiving messages sent by the pool of executors. This looks like an infinite loop, but it’s not, because once all the executors have dropped their end of the channel, the iteration will stop and the consumer thread will exit.
The main thread’s loop is another candidate.
On the power, and simplicity, of event-loops
The important thing to note, is that such an ‘event-loop’ is not multi-threaded in itself, in fact you could have one in a completely single threaded context, rather, this “event-loop” is the place where stuff is executed inside a thread in a sequential manner.
An example of the power of this approach: When the consumer receives messages, it will mutate “track_steps”, in order to “aggregate” the work that is being done by the workers. So while this mutation is done “in response” to receiving a message from another thread, “when”, and “whether”, this is done as part of an iteration of the consumer’s loop is entirely predictable.
On the “whether” something will happen as part of an iteration: looking at the run loop of an executor, you can see quite clearly that no work will ever be done in an iteration, if a
Quit message is received at the beginning of the loop.
On the “when”, taking again the executor as an example, a ‘step of work’ will always be done “after” checking if there is a message first.
These little things gives you tremendous power to control the behavior of your component at each iteration of their loop, and therefore of your system as a whole.
For example, your consumer could decide to call it quits and not mutate ‘track_steps” any further, ignoring messages from the workers, while sending the main thread a message to shut the pipeline down. Since the workers themselves always check for a shutdown message before doing a ‘step’ of work, the system will sequentially shutdown as those “Quit” messages are handled by each component’s event-loop.
There is simply no guesswork involved regarding how this
Quit message will travel from one event-loop to the other, and affect the behavior of individual components and the entire system.
That is the power, and simplicity, of messaging and event-loops, versus shared mutable state, in the context of concurrency.
For a real world example of an event-loop with a finely tuned list of ‘steps’, performed in specific order and under specific conditions as part of each iteration, see the event-loop of a web page: https://html.spec.whatwg.org/multipage/#event-loops, and for a good description of how such an event-loop deals with logic happening ‘in-parallel’ of it, see https://html.spec.whatwg.org/multipage/webappapis.html#event-loop-for-spec-authors.
Your workers might not have unlimited memory
The current example uses channels with unlimited buffers, and grows the ‘internal work queue’ of workers without limit.
One method could be switching to using a SyncSender to send work, and have the worker implement logic in it’s run loop that would stop receiving messages when it has “enough work”.
But what about the main thread sending work, would it not block when a workers channel’s buffer is full? Perhaps using “try_send”, and cycling through your workers until you find one that accepts your message, could be a step in the right direction?
The changes required to implement this would look something like this…
Another way to achieve a similar results, could be to have the main thread share a sender to each worker for them to signal when they want to be “taken off” or “put back” on the queue to receive more work, which could look something like this.
That would require a form of keeping track of workers by their “ID”, preferably baked right into the messaging. That can be done by wrapper your sender into a struct containing some extra data, like:
This new “ExecutorControlChan” is a good example of associating a channel with a piece of data, in this case an “ExecutorId”, and send this piece of data along with each message. In this particular example, a worker just ‘sends a message’, while the main thread receives a tuple containing the message and the ID of the worker.
The pattern of wrapping a Sender inside a struct, and assiciating a “send” with data, or another operation, can have different real world use cases, such as:
A few notes on the wonderful semantics of Rust
I cannot finish this article without highlighting some of the wonderful things that Rust allows you to do.
Noticed how this match statement assigns to ‘result’? While most of you probably are aware that a match statement can assign a value to a ‘let’, have you noticed that one arm of the match statement doesn’t assign anything? It only does a ‘continue’.
How is that possible, isn’t this like assigning () to the ‘result’? Well, apparently Rust is smart enough to understand that this ‘continue’ isn’t assigning anything, but rather ending that particular arm of the program flow(You can also use ‘return’ in a similar way).
Since we are inside the body of a loop, all the stuff happening below will not occur if that match arm is met, and instead the loop will start again from the top of it’s body for another iteration.
So that means that’s we’ll only get to this point at the end when all the workflows have been executed and we’ve received a message from the consumer.
Off course, at that point one indeed needs to ‘break’ out of the loop…
To me, this is really one of the things that makes Rust an absolute joy to use.
When doing concurrency in Rust, in order to be ‘sharing by communicating’, please consider to ‘communicate by sharing (clones of) senders’….
To be continued: