Thinking in nested streams with RxJS
By: Adam Sullovey
In my first project at Rangle, I jumped into a codebase that used redux observable, a combination of RxJS and Redux for managing state and side effects like calls to our backend & 3rd party APIs. Although I was new to RxJS, I could relate many of its operators like
take to standard JS array functions, or functions included in the Ramda library. Other RxJS operators were like nothing I had seen before, such as
What is a Higher-Order Observable?
The same way a higher-order function is a function that returns another function, a higher-order observable is an observable that emits more observables in its stream of events. This can be confusing when you first see it, so let’s talk about how to identify a higher-order Observable.
Recognizing a nested stream or Higher-Order Observable
The first time I encountered a higher-order observable was a surprise. I logged out the result of a
map function that contained asynchronous behaviour in the middle of a stream, and saw this in my console:
I expected to see the result of an HTTP request, but instead saw another observable. Here’s the code that created that output:
Read on to see why this happens, and how to work with streams like this.
How to create Higher-Order Observables (without realizing it)
There are many ways to find yourself logging out a stream of streams. Here are some I’ve encountered:
Adapting other code to use Observables
from to convert single values, arrays, objects that emit events, and promises into observables. If your application converts something into an observable from inside the
map operator, the next operator in your pipe will receive an observable, and you'll have a stream of streams. That means you outer observable has become a higher-order observable.
You can do this without using the
from functions directly because libraries you rely on might use them. For example, Angular's http service uses
of to convert an
HttpRequest into an observable. That means making HTTP request within an observable's stream turns it into a higher-order observable.
Splitting one stream into many
groupBy turn one stream into many based on the result of a function. This is helpful when categorizing events to treat them in different ways. It also creates a higher-order observable - the initial stream will now emit more streams of categorized events rather than its values.
Working with nested data structures
One of my experiments with RxJS was traversing directories of files on my computer and returning a single stream of file paths. I started with one Observable that emitted names of files found one folder, but as the program discovered subdirectories, it emitted more Observables which emitted more file names and more Observables. Once the code had walked all the way down the tree of files and subdirectories, it returned an Observable with a stream that mirrored the structure of my file system (with Observables in the place of directories), rather than a simple list of files.
If Observable X emits Observable Y, anything subscribed to Observable X will receive Observable Y, not Observable Y’s values.
Visualizing Higher-Order Observables with RxViz
RxViz is a fantastic tool that can animate observables emitting streams of events, helping you determine if your code is behaving the way you expected. I found marble diagrams like the ones on https://rxmarbles.com were good for explaining some some stream concepts, but RxViz’s animation made the idea of nested streams click in my head.
How RxViz works
Provide RxViz with some JS code where the last line is an observable, press the pink “Visualize” button, and it will create an animation of what its stream is emitting.
A simple example is rendering a single stream that emits a number once a second:
That will look like this in RxViz:
Your first Higher-Order Observable with RxViz
Try out this code:
map to change the events being emitted by the first observable (created by
interval(1000)). While the stream in the previous example emitted 1, 2, 3, 4..., this stream emits new streams. Visually, that looks like this:
What does it mean?
- The top line represents the first stream that was created by the outermost observable,
- Grey circles on that line represent new observables being created inside the
mapfunction. Each one was created by
- Each line down the screen represents a new stream from one of those new observables
- Each new line across the screen with numbers represents the output of an observable created by
- Each successive internal stream starts further to the right because it begins later.
Does this look like a lot of complexity that you didn’t intend to add to your program? Don’t worry.
But I didn’t intend to create a Higher-Order Observable
A higher-order observable might be the right way to process or represent data in a computer’s memory, but it also might not be the way the way to deliver data to a consumer, such as a UI component or another part of your application. Unboxing values nested multiple observables deep can make your code complex.
Maybe you haven’t faced this complexity before when working with Promises. A magical thing about them is that they flatten themselves. Promises can be used in long chains or nested deeply within each other, but calling
.then on the outermost one will retrieve the value from innermost one when it resolves. RxJS' observables don't do this automatically, but the library provides us with ways to flatten observables when we choose.
Flattening Higher-Order Observables into regular, First-Order Observables
Let’s use this as an example of a higher-order observable in RxViz
Like the last example, this is a higher-order observable. The first observable emits 3 more observables with different content (letters, numbers, emoji)
flatMap is the easier operator to understand. When applied to one observable, it captures all the events emitted by nested observables and moves them up into the first observable's stream.
You will see all of the values appear on a single line:
The grey circles are gone. The 3 nested observables have been flattened into one, and all the values have been preserved. If you adjust the timing so that the streams overlap each other, you will still see all the events.
switchMap behaves differently. Modify the last line to look like this:
When the second stream starts emitting numbers,
switchMap only puts the numbers from the second stream into the flattened stream. When the third stream starts emitting emojis, the flattened stream only includes emojis from then on. All other incoming events from the first two streams are discarded. Compare that to
flatMap, where all the types of values were mixed together when the streams overlapped.
Why do Different Ways of Flattening Higher-Order Observables Matter?
Like I said before, nested streams might work well for processing data, but might not work for displaying data to your end users. Choosing the correct operator to flatten the higher-order observables is vital for delivering the correct user experience
Here are two very common UI elements using streams to manage their (fake) HTTP requests to a server.
- in both examples, using plain
switchMapwould result in an
Observablebeing passed to the UI to display, rather than an actual value that is useful to show the user.
flatMap(or vice-versa) results in incorrect behaviour - try it out and see
Each time you check or uncheck a box, this application fakes an HTTP request to a server to save the change. Since each request results in a new stream for the HTTP request being created inside the stream of click events, we have to flatten the result to apply it to our UI.
The internet can be slow and unreliable, so responses may come back in a different order than their requests were sent, and some of them will be errors. This means
flatMap should be used to flatten the stream so we that receive all the responses. What do you think would happen if we chose
switchMap? Edit the code and find out. See if you can explain why.
Type-ahead Search Field
After typing a few characters into this search field, it fakes an HTTP request to a server to get a list of cities you may be searching for. Like the above example, that results in a nested stream (HTTP response inside a stream of input change events), so the result needs to be flattened to be used easily in the UI.
You may type faster than the server can respond to search requests, and we only care about the most recent request’s response. If an older response from an outdated request arrives with stale data, we can discard it. This is a great place to apply
switchMap. What do you think would happen if you chose
flatMap in this use case instead? Edit the code and find out. See if you can explain why.
In this post we’ve looked at:
- The ways higher-order observables are created
- How to recognize a higher-order observable when you encounter one
- How to visualize their streams of events in RxViz
- How to visualize flattening a higher-order observable into a regular (or first-order observable) with
I hope you can take this information into your own stream experiments, and feel confident the next time you encounter a higher-order observable in a larger project.