Rust concurrency: a streaming workflow, served with a side of back-pressure.

Gregory Terzian
7 min readMay 4, 2020

As a follow-up on a previous article that explored some basic concurrent workflows in Rust, let’s now explore a slightly more advanced pattern: a concurrent pipeline where work is streamed from one stage to the next, and a way to signal back-pressure in that context. We’ll also look at the difference between “push” and “pull” sources of streaming data.

Code example at: https://github.com/gterzian/streaming_concurrent_workflow

No pressure

Let’s start with the initial code, unlike the previous article, the code example is pretty big and complicated, right out of the gate:

https://github.com/gterzian/streaming_concurrent_workflow/blob/8fdebc7d329cc0e78e9b6073da9fdc6f2ff84e74/src/main.rs#L13

So what’s going on here?

We have three different “components”, each representing a stage in the pipeline:

  1. A “source”, aka the producer of data, which it sends on to the
  2. “processor” component, which processes the data using a pool of workers, whose result is send to the
  3. “consumer” component, effectively the main thread of the test.

Once again, the “work” consists of nothing: it’s just a number being passed around the various stages, and what we care about is the structure of the concurrent workflow, not the actual work.

Each component runs an “event-loop”, in the sense that the component is driven by receiving and handling messages, in a loop.

Each event-loop is slightly different, and relatively complicated, so let’s go through them one by one.

Who’s your source?

The “source” component runs “one tick at a time”, using the excellent tick from crossbeam. It then simply increments a number and sends it downstream for processing. This stops once an overflow on the number is hit, at which point another message is sent downstream signalling that producing has stopped.

So far so good.

The processor

The processor runs an event-loop select’ing over two channels:

  1. A channel from the “source”,
  2. a channel from the workers actually “performing the processing” on a thread-pool.

It also operates on two flags: one exiting, used to note when the “source” has stopped producing for good, and a ongoing_work, used to track how many units of work are being performed on the pool.

Workers on the pool do three things:

  1. Simulate “work” by sleeping for a bit, and then
  2. send the result to the final “consumer”, and
  3. send a message back to the “main” event-loop of the processor, to let it know a unit of work has been performed.

The consumer

The consumer event-loop, running on the “main-thread” of the test, simply counts the units of work that are received, and quits once the “stopped” signal is received from the processor. Since we know the source will stop producing once an overflow is hit, we can assert that the units of work add-up to the maximum for the u8.

Taking a step back

Looking at the pipeline as a whole, we can discern a few features:

  1. There is no concept of back-pressure, the “source” will simply keep producing at each tick, and send the work on an unbounded channel downstream for processing.
  2. The “source” acts as a “push-source” of work for the “processor”, since it simply keeps pushing work downstream as fast as it can.
  3. The “processor” also acts as a “push-source” to the “consumer”, simply sending the result of processing for consumption as fast as it can, and there is no back-pressure there either.

Despite all this lack of back-pressure, the size of the unbounded channel between the “source” and the “processor” doesn't grow much. Why is that?

The answer is that the work instead piles-up on the internal work-stealing queue of the thread-pool.

Also worth highlighting is the overall component-driven design: while it could be tempting to “simplify things” by moving the thread-pool into the “source” component, and simply spawn work as it is produced, we get some benefit in terms of encapsulation of business logic by having clearly separated “source”, “processor” and “consumer” components.

The “source” simply sends work on a channel, and isn’t aware of the thread-pool used internally by the “processor”. Also the “main event-loop” of the processor acts as a clearly encapsulated linear state machine with regards to the exiting and ongoing_work state it owns.

By nesting the work performed on the thread-pool to the event-loop of the “processor” component, we get a clear structural hierarchy.

Now we can look into introducing some back-pressure into the design.

how would you design an autonomous-driving highway?

When you read about back-pressure using channels in Rust, you usually encounter something about the need to use “bounded channels”, or something about the fact that a certain flavor of async streams comes with “built-in” support for back-pressure.

I’d like to present a different angle, by thinking about a simplified design for a highway for autonomously driven vehicles.

So, on such a highway, you want the vehicles to drive as fast as they can, yet you also want to have them slow down, and outright stop, in the case of congestion(and if you think there would be no congestion on such a highway, think again).

And using a bounded channel(or a stream with “built-in” support for back-pressure)to signal back-pressure, to me, is a bit like having those cars driving as fast as they can, and then hit the brake full-on once they’re about to hit the car in front of them.

Would you be satisfied with a highway where the “back-pressure signal” travels upstream by way of screeching brakes and probably more than a few crashed bumpers?

That would seem like a bumpy ride, to say the least.

What you would probably want instead, is for cars(who cannot be re-routed) to start slowing-down way ahead of any congestion, and then gradually slow down further, all the way to a grinding halt if the congestion hasn’t dissipated by the time they reach it.

In other words, you want a bit of a buffer, and you want to start signalling in advance of the buffer hitting a limit, according to some purpose-written business logic.

So, my take is to focus on that, the business logic, as a means to bound the work pipeline, and to forget about bounded channels, because what they will bring is mainly deadlocks.

The second example will show how this can be done.

Here comes the back-pressure.

So, I’m just going to put it all here:

Wow, that’s way beyond the “simple example”, but we can get through it step by step.

So, let’s start with the “source” component, who has now also grown a select, which includes the ticker we saw earlier, and a new channel allowing it to receive messages from the “processor”.

The part handling those messages is at:

So here’s the nice thing: the ticker can simply be mutable, and we can mutate it from within the select. What we’re doing here is mutating the ticker in response to those RegulateSourceMsg message, which come in the “slowdown”, “speed-up” and “stop” variant.

It’s actually very simple:

  1. When we get a SlowDown message, we double the ticker, thereby reducing the rate at which we “produce work”.
  2. When we get a SpeedUp message, we halve the ticker, increasing the rate at which we produce work,
  3. And finally, when we’re told to Stop , we replace the ticker with a never channel, again from the excellent crossbeam crate. And that channel does exactly as its name implies, it will never wake-up the thread blocking on it. Since it’s only one of two channels in the select, it effectively means that we won’t be producing anything, until receiving another RegulateSourceMsg message. Pretty cool, right?

The new processor

How has the “processor” changed?

Well first of all, it now also owns a “buffer”, essentially a VecDeque. Then, when it receives work, if the two workers on the pool are already busy, it will buffer the work. When it receives a “work done” message from a worker, it will then also inspect the buffer, and if there is anything, immediately send it on the pool for processing.

Furthermore, after performing those operations, it will “check the size of the buffer”, and modulate the source accordingly, as can be seen in the below:

So this is essentially the other side of the message handling part of the source we just looked at.

One last thing worth mentioning is this tick_adjusted business. What’s that about?

We’ve added a new SourceMsg::TickAdjusted message, which the source sends when it has processed a RegulateSourceMsg message. This enables us to not send a huge amount of these messages to the source. Essentially, the processor will send one such message, and then only send another one after having received confirmation from the source that the last one has been processed.

Note that this confirmation is asynchronous: the processor doesn’t block on it, instead it just keeps running, and sets its internal tick_adjusted flag to true whenever it receives the SourceMsg::TickAdjusted message. In the meantime it will not send other RegulateSourceMsg messages.

You’ve made it

Good, in the intro I wrote “We’ll also look at the difference between “push” and “pull” sources of streaming data”, but you know what, I think this is more than enough for today.

Let’s just note that this is definitely a “push source” type of workflow.

It’s also worth re-iterating that what we have here is a bounded pipeline of work(with probably some broken overflowing code), and there is not a bounded channel in sight.

This gives us a huge benefit: you can forget about any risk of deadlocks arising from the messaging.

Further reading

A lot of this is based on patterns found in the Streams standard: https://streams.spec.whatwg.org/

--

--

Gregory Terzian

I write in .js, .py, .rs, .tla, and English. Always for people to read