Pausable Observables in RxJS
and other backpressure techniques
There are different ways to pause event streams: we can filter, delay, buffer, space events, etc. Some of the techniques will ignore events during pause (lossy), others will delay events handling until resumed (lossless). Some of them will keep their subscription, others will resubscribe — therefore their output will behave differently, depending on the source. In this article I’d like to explore several pausing techniques and suggest how they can be implemented using RxJS. Lets go!
Imagine we have an app that displays push notifications from the server. These notifications we will pause in all different ways!
Why: user wants to mute notifications for some time. Notifications during enabled mute will be skipped
What: when paused we simply filter out events from Observable source, keeping server connection intact
windowToggle operator would be the best fit for this. Using it we can indicate when to pass events from the source, and when to filter them out
() => ons$
// then flattern window Observables
flatMap(x => x)
Play with this example using windowToggle operator in a playground.
Unsubscribe / resubscribe (lossy)
Why: when user switches to another window — we can unsubscribe from notifications stream, therefore lowering network, process and server load. Once user switches back — we restore notifications stream connection
What: when paused we unsubscribe from Observable and resubscribe on resume
takeUntilwill complete Observable when we pause
repeatWhenwill subscribe again when we resume
repeatWhen(() => offs$)
Play with this example using takeUntil and repeatWhen in a playground.
when paused we
switchMapto an empty Observable
when resumed we
switchMapto source stream (this triggers subscription)
- or manual use of
Sampling, throttling, debouncing and audition (lossy)
Why: when you don’t want to bother your user with frequent updates — there are several ways to dose amount of messages shown in a given time period
What: every N ms take the latest value emitted since previous sampling
Note: in the marble diagram above I keep source marble colors, while updating their values to represent time when they were emitted. E.g. the second yellow marble from the source was emitted at 5ms, in
debounceTime(10) it was emitted at 15ms, in
auditTime(10) it was emitted at 10ms, and it was ignored in
What: debouncing emits a value if after a given time no other values were emitted. Throttling does exactly opposite: when a value is emitted — ignore consequent emissions for a given time period. And audition acts similar to throttling, though it emits the latest value in a given period, not the first.
How: for all four we have out-of-the-box operators
Play with sample operator in a playground.
Check out this comparison of debounceTime vs throttleTime vs auditTime.
Why: when push notifications come rapidly — we’d like to ensure that user has time to grasp them. So we make at least 1 sec pause after displaying a notification
What: making at least N-long gap between emissions
concatMap with a
timer, that starts with source value and completes in N time
concatMap(value => timer(N).pipe(
Play with this example of spacing values on an observable using concatMap.
Why: imagine that our client has lost connection to the server, and is trying to send a bunch of messages. We can delay sending those messages until we get connected back
What: when paused we buffer values on the stream, emit buffer once resumed
windowTogglebetween resume and pause ,
bufferTogglebetween pause and resume
source$.pipe( bufferToggle(off$, ()=>on$) ),
source$.pipe( windowToggle(on$, ()=>off$) )
// then flatten buffer arrays and window Observables
flatMap(x => x)
switchMappause stream onto
bufferwhen paused, onto raw source on resume
Old geek note: there used to be a pausableBuffered operator in rxjs 4 that implemented this kind of pausing
A weird bonus!
Delaying events on mouse hover with event spacing (lossless)
When mouse hovers a message — we want to pause a stream to give user time to read the message. Notifications stream will be paused until user drives mouse away. Also, all the events are spaced to give user at least 1 second to read the message
Notes and honorable mentions
Ability of Observable consumer to control producer’s emission rate, time or amount is often called back pressure. So pausing is just a subset of back pressure.
Obviously, we could not cover all back pressure techniques in this article, though here are some I’d like to mention:
delayWhenare simple, yet useful operators that might fit your pausing needs
controlledObservable — another operator from RxJS prior v5. It allowed consumer to control when Observable should produce values on the stream by calling
source$.request(Number). Read more on this operator in RxJS 4 docs and check its source code to get insight how this could be achieved in modern RxJS
- Pausable delayed Observable — in this case result stream would delay event production, keeping emission pace from the original source. Say, if we use this to delay a 1s timer — after resume it would still emit at 1s rate. I’d suggest checking
timeIntervalwhile implementing this kind of pausing
Have another example, a different approach or a question — please, share that in the comments section! Your feedback is very valuable!
If you enjoyed reading this article — give a push to the clap button: it will let me understand usefulness of this topic and will help others discover this read. Follow me here on medium and twitter for more updates!
I’m proud that you’ve read so far! Congratulations!
Check out my new article on error handling in RxJS! Lots of marble diagrams and code samples will help you understand the nuances of different approaches:
And in this article, I’m exploring an Rx+JSX framework concept, check it out:
Be sure to check this RxJS Playground — the tool I used to compile marble diagrams for this article. I’ve created it to help developers (myself included) explore, understand and explain RxJS streams. Give it a try!