Backpressure in Reactive Streams

Ravi Aggarwal
Aug 19, 2019 · 6 min read
A natural stream of beautiful events 😆

Backpressure as a term has been derived from fluid dynamics, where it means

Resistance or force opposing the desired flow of fluid through pipes.

But, what does it mean in the Reactive environment

Build up of events at the subscriber level when the producer is churning events at a faster rate than a subscriber can consume.

Well, that doesn’t help much! I know! Let’s look at a video.
This is one of the famous scenes from an American sitcom, I Love Lucy.

Lucy and the concept of Backpressure

Here, Lucy and Ethel are trying to wrap the chocolates as they move on the conveyor belt. As the scene proceeds, things start becoming hilarious when the conveyor belt starts moving faster.
The conveyor belt can be inferred as a producer churning chocolates at a faster rate than Lucy (the consumer) can wrap them.

Let’s take a look at some “Technical” examples for an “a-ha” moment

Let’s take a scenario where you have to fetch 1500+ records from a database where each record is ~1 MB in size and upload them on server serially.
I know, you don’t need to but just humor me, alright!

What do you think is going to happen here?
It’s highly probable that by the time you finish uploading the first record, your producer has completed the fetch operation with ~1.5 GB of the in-memory buffer.

More often than not, you are going to get an out of memory exception and the app is going to crash!

Writing a file is generally slower than reading it. So, if a system has a read speed of 100 MB/s and writing speed of 50 MB/s, that means if you read a file from one place and wrote on other, you would face a growing deficit of 50 MB every second. So, if you had a big file of size 10 GB, by the time you finish reading the file you’d still have 5 GB buffer

10 / 100 MB/s = 100 seconds
50 MB deficit x 100 ~= 5 GB!

Some systems might not even have that much space available, now it would help if we could control the speed of producer, i.e, read-only as fast as you can write.

Take two Observables and for laziness’ sake, name them A and B where A is twice as faster than B and zip them. Take a look at this code

What do you think is going to happen here? I’ll give you a hint, RxSwift doesn’t put any bounds on the buffer that Observable A is creating.
Yes! sooner or later you’ll get an out of memory exception.

What can we do about it?

There are a few solutions that come to mind

  • Debounce: only emit an item from an Observable if a particular timespan has passed without it emitting another item
Marble diagram for Debounce
  • Throttle: only emit the first/last item from a particular timespan of event emission
Marble diagram for Throttle
  • Sample: emit the most recent items emitted by an Observable within periodic time intervals
Marble diagram for Sample

Let’s look at some code for understanding

Lossy solutions for backpressure

Looking back at the record uploading example, do you need to upload all the records? YES. Do you need them all in one go? NO
There are a couple of ways to tackle it

  • Buffer: periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time
Marble diagram for Buffer
  • Window: is similar to buffer but rather than emitting bundles of items it emits observables with their independent life cycle.
Marble diagram for Window

Let’s look at some code

Lossless solutions for backpressure

All of this is well and good, but if you see carefully, you’ll notice that buffer and window are good as long as they are limited in nature and if your events start breaching the limit, you’ll start losing events.

Let’s look back at Lucy’s conveyor belt

She used the buffering technique by putting some chocolates aside to get to them later and after some time she starts eating and hiding those chocolates which resemble dropping events.

But what she needed was to tell the conveyor belt operator to calm the hell down!

Now, there is no way you can do this in RxSwift 🙄

But, combine provides a nice way to achieve this

There’s a concept of Subscriber in Combine which is similar to RxSwift’s Observer and where Observer only provides

func onNext(_ element: Element)

Combine’s Subscriber provides

func receive(_ input: Self.Input) -> Subscribers.Demand

Do you notice the difference? Yes, what is that weird return type? It is defined like so

public enum Demand {     
case unlimited
case max(Int)
}

That my friend is your backpressure.

When a subscriber receives an event, it can also notify the Producer, how many more events can it handle.

So, Lucy can now tell the conveyor belt operator to send max(5) chocolates and the world is beautiful again.

Conclusion

So, we had a problem with an over-energetic producer and looked at ways of dealing with the Backpressure it generates. We looked at how sometimes you can drop a few events like debouncing a user tap event, and then how you can buffer the events and use them in one go like uploading batches of records.

We also looked at a technique using which you can talk to the producer and ask it to slow its roll.

Solutions like these for backpressure can help you achieve smoother and bug-free user experiences.

Tokopedia Engineering

Story from people who build Tokopedia

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store