Backpressure in your Kotlin Flows

Nick Skelton
Google Developer Experts
6 min readFeb 12, 2021

When should I start to worry?

Photo by CHUTTERSNAP on Unsplash

It’s not often you have to worry about backpressure in the average Android App. Events just generally don’t scale on our puny little mobile devices to levels where backpressure becomes important. Not to say that it’s not possible, it’s just not common in my experience. However, as I found in my previous article when experimenting with callbackFlow, it’s not just the scale of the events in a Flow, but also the structure and order of the events themselves and the assumptions we make about how they will arrive that can cause bugs related to backpressure.

What is backpressure?

A concept you are probably familiar with is “Buffer”. This is a good starting point when trying to conceptualise backpressure. Every developer has interacted with a File at some point: created a ByteBuffer and a FileInputStream and managed to stream the bytes of the file into a String or something. A Flow is a very similar, except it is a stream of events — the events can hold any kind of data, including bytes. And just like with files, if you have a stream, you need a buffer, just in case data is produced faster than it is being consumed… which is exactly the definition of backpressure: when data is produced faster than it can be consumed.

However, when people talk about backpressure in the context of Reactive Streams, they are usually talking about “Backpressure Strategy”:

  • How big is the buffer? (Size = 0, 1, 64, 100000)
  • What do we do with new events when the buffer is full?

“Unfortunately nobody can be told what backpressure is. You have to see it for yourself.”

Time to download the code to follow along the rest of the article:

In the previous article, I converted some Firebase callbacks to Kotlin Flows using callbackFlowand focused on how to produce data for our Flow. It wasn’t until I started to consume these events (at the collector) that I came across a potential problem and wandered into backpressure.

Note: the Firebase implementation is not really important — everything I explore below is specific to Kotlin Flow.

Why not just collect?

collectLatest, collect or collectIndexed… which to choose?

And why is collect() all red?

When I first encountered this choice I said to myself, collectLatest sounds about right, and I was correct. Coincidence?

By choosing collectLatest, I didn’t know it, but I had just made a decision regarding how I intended to deal with backpressure. Actually, if we want to get deep, I was actually led to this decision carefully by the designers of Flow — they have designed (and limited) these collection options in such a way that guided me to choose the option that is 99% of the time correct. In other words, they did their usability homework and found that most people are only interested in collecting the “latest” event.

collect() is red because JetBrains is strongly discouraging you from its usage, without completely forbidding it. This is to give advanced users the opportunity to suggest a new use case. From the source comments/documentation:

Such limitation ensures that the context preservation property is not violated and prevents most of the developer mistakes related to concurrency, inconsistent flow dispatchers and cancellation.”

At first glance, collectIndex() looks like it would only be useful if you are interested in the index of each event, however, there is another important reason that you might want to choose collectIndex() — it handles backpressure differently to collectLatest() . I’ll explain how later on.

For now, let’s see what happens with collectLatest()

Dropped packets

Let’s simulate some backpressure.

delay(5000) is simulating a collector (aka consumer or client-side depending on who you speak to) that takes 5 seconds to process each result. Recall that backpressure occurs when we consume slower than we produce. The delay is to exaggerate the sluggish consumer and create a super slow collector.

Since we chose collectLatest however, we’re safe because when another value is “offered” (aka produced), the collectLatest routine will effectively stop what it was doing and start again with the latest result.

This has the risky side effect that updateScreen() will usually not be called until the final offering. If Firebase sends updates about progress more often than every 5 seconds, then our UI will freeze until five seconds after the very last offering. Not good.

Thus collectLatest makes an important assumption about the contents of the Flow: the final value offered to the Flow is the most important, and we really don’t want to miss it.

However, what if the ‘dropped’ results were important too? What if it is really important that updateScreen() is called every time something is offered?

Let’s try using a different collector, one that doesn’t just bail out when a new offer arrives. Let’s see what happens with collectIndexed even though we’re not really interested in the index .

This works! All of our updates to the screen are delayed by 5 seconds, as we would expect, and the last value came through ok. Why?

Well, firstly, we have told the Flow that we are interested in all offerings not just the latest… this is a good start. But why didn’t something break?

The defaults just work

Everything worked because the default buffer size for callbackFlow is 64. So Firebase can make 64 offerings before offer() will start to return false (falsemeans the buffer is full and the offer failed).

Since Firebase doesn’t “callback” enough to fill the default buffer, I decided to simulate a flood. Each time Firebase sent a progress report, I would make 50 identical offers. Finally something broke!

This time, however, the offerings already in the Flow, remain. When the buffer is full, it’s the “latest” item that is lost — this is an important distinction. What if the final offering in the Flow is important and can’t be missed?

Then you need some more control over your backpressure and its time you were introduced to this important little Flow operator:

.buffer()

This is the magic operator for all things backpressure related and it is great fun to experiment with because it’s easy to get started and play:

buffer(capacity: Int, onBufferOverflow: BufferOverflow)

And there are three options JetBrains gives us for what to do when the Buffer overflows:

public enum class BufferOverflow {
SUSPEND,
DROP_OLDEST,
DROP_LATEST
}

DROP_OLDEST and DROP_LATEST are pretty straight forward, SUSPEND is interesting though. For our FileInputStream analogy, this would probably mean something like ‘stop reading from the hard drive’. This might be ok in some cases (when streaming a video clip, once the media player buffer is full, stop requesting frames from the server) but bad in others (when recording audio from a microphone, if you stop taking data from the microphone, you are missing whatever sounds are happening and the audio will have gaps).

Parting advice

As if the structure of the Flow API itself isn’t idiomatic enough, the source code documentation is really second to none in the Kotlin Libraries. They really put a lot of effort into making the documentation concise and useful. If you’re not sure what an operator does — CTRL-Click and spend 5 minutes reading the half-page manual on it. Time well spent.

I hope this article has helped you choose the correct backpressure strategy for your Flow and understand a little about why backpressure is important.

Thanks heaps to Enrique López-Mañas and Florina Muntenescu for reviewing this article. Legends.

Drop me a line on Twitter — I embrace both kudos and korrections with open arms.

--

--

Nick Skelton
Google Developer Experts

Freelance Android Dev. Google Developer Expert. Full Time Remote. Part Time Buzzword Hacker.