Rx-Volution! Coding expressively with RxJS

An introduction to RxJS and examples for React

By now it should be clear that building web apps sparks increasingly time-consuming challenges, and certainly we — the community — have been dealing with these challenges by coming up with nifty solutions.

React, Redux, Webpack, Rollup, Styled Components… Pick your problem, pick your solution. And excuse the fact that “Your Favourite Library™” might be missing from this list…

I won’t talk about fatigue, but instead about everyday problems in JavaScript. The huge growth in the number of libraries in the community is in my opinion not at all a sign of over-engineering, but it tackles some genuine problems in web development. And, oh boy, web development is more complex than it seems at first glance these days. I believe, this rant by Sean Larkin sums it up best:

https://twitter.com/TheLarkInn/status/783375297248763904

The rate at which the community moves can be scary, but ultimately the Open Source Community is trying to come up with solutions, to rid us from everyday hardships while developing for the web (and beyond.)

In this post I want to talk about neither styling, nor tooling, nor even about view libraries. I’d like to talk about some hardships that exist in parallel to all the obvious paradigms and libraries that you might use every day.

Common Hardships of JS developers

I believe that there are a couple of common problems that naturally attract technical debt, prove to be more of a challenge than they should, or—simply put — cost us too much time.

Some of them are:

  • Sending and composing Requests
  • Event handling
  • Animations and transitions
  • User input
  • Data flow & state management

The problem that all of these things have in common is Concurrency. These are typical problems that need to be addressed with asynchronous code.

My favourite tweet on concurrency

If you think the answer to all of these problems is “Promises!” think again. Unfortunately only requests can nicely be wrapped in Promises, due to their singular result. Everything else doesn’t fit nicely into a Promise-world.

Instead of starting right at Reactive Programming and RxJS though, I want to shine a light on the hardships that I’m talking about first.


Let’s start with an example: A range slider

To introduce an example that suffers from the aforementioned problems with concurrency, especially in terms of readability of the resulting code, I built a small range slider.

This is a simple React component, that renders a bar with a small knob, that the user can slide horizontally. It stores the x value of the knob in the state. As we would like to handle dragging we use the knob element’s onMouseDown handler and register a mousemove and a mouseup event listener.

When the user presses down we set isDragging in the state to true and after the user releases his mouse we set it to false. In the mousemove handler we recompute the knob’s position according to the mouse position while this isDragging flag is truthy.

So to make this work we have to add and remove event listeners, add an event handler to our element and add state to know whether the user is currently dragging.

“This is fine. Why would I want improve this?”,

you might ask. While this is perfectly acceptable, the code becomes more unintuitive and the control flow harder to follow, as we’d add more code to this. And adding code becomes a definite possibility, if we wanted to add support for touch devices, or more knobs, or more complex behaviour.

First of all, this is an article about RxJS. And since you are reading it right now, I can probably assume that you have your reasons already.

But jokes aside — What if I told you that you could write this in a way so that more people could intuitively understand what it does? What if this piece of code could scale in complexity, but stay perfectly readable?

This is where Reactive Programming comes in: Reactive Programming allows you to write more expressive, declarative code.


What is Reactive Programming?

The quickest description of it would be that in Reactive Programming you code with data streams. Therefore the key to understanding it is thinking in streams.

“Reactive Programming is programming with asynchronous data streams.” — André Staltz
“Hooray for overused FRP memes!”

Streams can be almost anything, since we’re just talking about values over time. Thus a stream could consist of: input events, changing data, caches, tweets — anything you would want to react to.

You might have used streams in some form already. Maybe you have worked with Node.js’ streams, or maybe you have used Promises or Futures, which are comparable to streams with only a single value.

The key to understanding why programming with streams is beneficial is understanding how this abstraction promotes simplicity while coding. Streams are collections of data over time and can thus be generally treated like a normal collection — i.e., like arrays.

Composing, filtering, transforming values that you receive over time is easily solved with a library like RxJS.

Furthermore Reactive Programming allows us to describe our code like we would describe the problem. Thinking back to our “range slider” example, we would express our problem naturally:

“When we detect a mousedown on the knob,
 we let the knob’s position follow the mouse’s position with every mousemove,
 until we detect a mouseup.”

Now that you’ve been through such a long introduction to understand the reasoning behind RxJS and Reactive Programming, you probably want to learn what RxJS is already, right?


RxJS, Reactive Extensions for JavaScript

Primarily RxJS is an Observable library. Observables are our primitive that represents the streams that have been mentioned above. They are still asynchronous collections — i.e., values over time.

If you think about the three primitives that you use on a daily basis, there shouldn’t really be any surprises.

Single vs Multiple & Synchronous vs Asynchronous primitives

The synchronous and singular primitive is just a value; The synchronous and plural primitive is an iterable e.g. an array. This is the first column in the table of JS primitives.

In the second column we find the asynchronous and singular primitive: Promises. Due to how the table is laid out, we immediately recognise that our day-to-day work is missing something crucial:

What about a primitive that is asynchronous and plural, i.e. something capturing multiple values over time? That’s where Observables come into the picture.

It is a missing primitive that is set out to be standardised by the TC39 committee: https://tc39.github.io/proposal-observable/. It is currently a Stage 1, and might therefore not be missing for long.

How Observables work

Marble Diagram: The structure of a Promise; http://codepen.io/philpl/pen/qrdzjj

This is a diagram of an all-too-familiar promise. The x-axis represents time, the marble represents the result of the promise, and the vertical stroke represents the completion of the promise. (Read more about Marble Diagrams)

While a Promise will resolve to a single value, or reject, an Observable can emit multiple values and complete, or error at any point in time:

Marble Diagram: The structure of an Observable; http://codepen.io/philpl/pen/aJOgjw

This example diagram’s Observable emits “A”, “B”, and “C” and completes afterwards. Notice that the completion is separate.

So while a promise can only resolve to a single value or error,

promise.then(value => {
// result
}, error => {
// error
});

An Observable can emit a value, complete, or error.

observable$.subscribe(value => {
// results
}, error => {
// error
}, () => {
// completion
});

Note that the completion and error don’t hold any actual values, since we have a separate callback for them.

Creating Observables

Similarly to how any emitter can be wrapped inside a promise,

return new Promise((resolve, reject) => {
resolve('Hello World!');
});

We can just as well wrap something inside an Observable.

return new Observable(observer => {
observer.next('Hello World!');
observer.complete();
});

Luckily — like with Promise.resolve— there are numerous constructors in RxJS that often make it unnecessary to create Observables from scratch:

Observable.of('Hello World!');
Observable.from([ 'A', 'B', 'C' ]);

If you’re wondering why we’re receiving an observer when we’re creating an Observable, think of it as the consumer of the values. It is an object that holds our next, complete, and error callbacks. (Read more about observers)


Operators! The cool — functional — part of RxJS

Since Observables are essentially just collections, you’ll find that you can use methods on them as you would expect it from e.g. arrays.

RxJS comes with operators to transform, filter, combine, and alter Observables in all kinds of different ways and flavours. That’s also why it’s sometimes called the “Lodash for Async”.

So it totally comes with all the methods that you have on Arrays as well: .map.concat.filter.reduce.every.some.includes, but also .first.last, and lots more…

“Help! I can’t remember any operators!”

Fear not, if you can’t remember these operators on the spot. Websites like RxMarbles and the RxJS docs can help you find the right one. Furthermore I’d like to introduce five basic operators that will suffice for getting started with RxJS.

.map Operator

This is an interactive diagram. Try dragging the values and the completion ;)

The .map operator transforms the values of an Observable using a transformation function… Just like Array’s .map.

.filter Operator

This is an interactive diagram. Try dragging the values and the completion ;)

The .filter operator filters the values of an Observable using a predicate function… Again, just like Array’s .filter.

.concat

This is an interactive diagram. Try dragging the values and the completion ;)

The .concat operator concatenates Observables, like Array’s .concat. Since time plays a role with Observables, the concatenation will start emitting the values of the second Observable, only once the first one is completed.

.merge Operator

This is an interactive diagram. Try dragging the values and the completion ;)

The .merge operator is similar to the .concat operator, but merges the emissions together — i.e., it starts Observable concurrently. Thus in the resulting Observable all values are combined and keep their original position on the time-axis.

.mergeMap Operator

This is an interactive diagram. Try dragging the values and the completion ;)

You might know the .mergeMap operator by its alternative name .flatMap. It allows you to return a new Observable in the transformation function, whose values will be merged into the resulting Observable.

There’s different variations to this operator as well, like .switchMap, but these five should be enough for you to play around and get familiar with RxJS.

Keep in mind to learn as you go!

Remember that you don’t need to memorise all operators before you start. Feel free to look up operators as you go. Trust me, you will learn the ones you need in no time!

What does an operator do under the hood?

Before we move on, there’s a crucial thing to say about operators.

Implementation-details aside, an operator creates a new Observable that wraps around the original one.

Let’s say you’d want to filter an Observable by applying a predicate; As values come in over time the filter operator would take them in, and emit only the ones that pass the predicate on the new Observable. This is important to grok, since it means that Observables adhere to all functional paradigms, not just the methods / operators themselves.

A chain of observables can be combined. You can assign an Observable to a variable and reuse it in multiple, different chains, if you wanted to.


Let’s refactor our range slider

With the newly gained knowledge we can now go back to our above example of the range slider, and rewrite its logic with RxJS.

The code should look very familiar, but instead of directly using event listeners on window, we can use the fromEvent constructor, that wraps a listener inside an Observable.

Right off the bat we see that 22 lines were deleted and 9 added. We don’t really care about conciseness here, but it is a nice bonus.

What we do care about is expressiveness, and we indeed see that the entire behaviour of our slider ends up inside the onMouseDown method. It ends up almost explaining itself:

onMouseDown(){
mousemove$
.takeUntil(mouseup$)
.map(({ clientX }) => this.computeX(clientX))
.subscribe(x => {
this.setState({ x })
});
}

It actually resembles the description of its behaviour that was stated earlier.

“When we detect a mousedown on the knob,
 we let the knob’s position follow the mouse’s position with every mousemove,
 until we detect a mouseup.”
  • The mousedown is handled by React so it doesn’t fall into RxJS’ territory
  • We follow the mouse’s position using our mousemove$ observable, the .map operator applying the x value calculation, and the actual setState inside the subscription
  • We stop the observable, when we detect a mouseup event, using the takeUntil operator

To further explain what’s going on with the takeUntil operator, here’s another diagram:

This is an interactive diagram. Try dragging the values and the completion ;)

Again, we can describe complex behaviour using a single, expressive operator. In this case, we can interrupt and complete a source Observable, when a second Observable emits a value. Handy!

What if we bring onMouseDown into the RxJS-world as well?

You’ve probably noticed that the example is still using React’s onMouseDown hook. What would the code look like if we had an Observable for that as well?

This is an interactive diagram. Try dragging the values and the completion ;)

If we had a stream down$ that emits when the user presses his mouse down, we want to emit the move$ events, until the user releases his mouse (up$), for each of these “down-events”.

Edit: After receiving a reply, asking for what this would look like in the “Rx Range Slider” example above, I refactored the code to illustrate this behaviour; adding a stream for the mousedown events as well: https://codesandbox.io/s/ZVK8gQ15R

What is the share operator on the fromEvent observables doing?

Maybe you’ve also noticed, that the fromEvent observables, at the top of the example, is using a share operator. This is a special operator that revolves around laziness of RxJS’ Observables…


Observables are lazy! (by default)

What RxJS Observables think — probably

With RxJS, Observables are always lazy by default. They won’t do anything until they’re being observed. That means that an observable chain won’t start or do anything, until you actually subscribe to it.

This is called a “cold observable” — for some reason.

The opposite is a “hot observable”, which can start running independently from their observer. You can turn cold into hot observables with operators like .share.

In the case of the .share operator, the subscription underneath the operator is shared. The underlying observables runs as long as at least one observer is present and listening to it.

For our example this means, that we’re only attaching one event listener for the shared observable to the window, and we’re keeping it around as long as we’re using it. It’s useful to keep everything clean of unnecessary listeners.

There’s a lot more to hot observables, but it would maybe deserve an introduction of its own…


Let’s try a bigger example: Search Typeahead

The Range Slider was nice for getting started, but this article should go through an example that touches more of the hardships, I mentioned at the beginning.

This example is a search field for movies. It involves sending requests and displaying suggestions to the user. If you’ve gone through another RxJS talk or intro, something like this was probably in there. Not even kidding. It’s a very popular example.

This example has more complex behaviour than the previous one, and thus has a little bit more code, but to break it down:

  • We’re using RxJS’ Ajax constructor that is wrapping the request inside an Observable, instead of a Promise, like the fetch API
  • We’re chaining a lot of operators and achieve debouncing by 200ms; filtering for search terms, that are long enough; ignoring non-distinct values; and so on…

You don’t have to go through this, but we can see that complex code scales nicely and can still be read from top to bottom, without losing track:

const search$ = Observable
.fromEvent(this.input, 'keyup')
.map(({ target }) => target.value)
.filter(query => query.length > 2)
.debounceTime(100)
.distinctUntilChanged()
.switchMap(query => fetchMovies(query))
.map(res => res.response.results
.map(({ id, title, release_date }) => ({
id,
label: title + ', ' + release_date
})).slice(0, 5)
);

Since this observable has to start when the component was mounted, has to stop once the component unmounts, and can run indefinitely, we subscribe to it inside React’s componentDidMount lifecycle hook and store the subscription:

this.sub = search$.subscribe(...);

In the componentWillUnmount hook we use this subscription to cancel the ongoing observable:

this.sub.unsubscribe();

As expected, this tells us two things:

  • Unlike Promises an Observable can emit results indefinitely
  • Observables, unlike Promises, can be cancelled

Cancelling an ongoing Observable subscription

When we create an Observable from scratch we can actually return a function that will be called on tear down.

That means that we can safely remove an event listener, or cancel an AJAX request.

new Observable(observer => {
observer.next('Hello World!');
  return () => {
// called on tear down
};
});

For our example above, where we are using Observable.ajax this will cause the switchMap operator to cancel the ongoing AjaxObservable, before subscribing to the new one.

Cancelling requests is often overlooked and neglected in web development, but with RxJS it becomes quite easy.


Before we’re done… Why should you choose RxJS?

Apart from RxJS there are a couple of Observable libraries these days. Some of them are xstream, Bacon, most.js, and Kefir. Nothing stops you from using a different one, especially if you understand the differences and trade-offs.

Nonetheless there are some reasons why you might want to choose RxJS over some or even all of them.

Crossplatform Pattern

RxJS is an implementation of the decade-old ReactiveX. Not only does that mean that it’s proven, it means that implementations are available for a long list of other languages. This can help you tremendously, if you need to talk to developers on a different part of your company’s stack, who are using Rx as well.

Java, .NET, Scala, Clojure, Swift, C++, Lua, Ruby, JRuby, Python, Groovy, Kotlin, PHP, Elixir…

Furthermore RxJS has recently been rewritten from scratch using TypeScript by Netflix developers, with a special focus on performance and robustness. It is not as fast as most.js admittedly, but its community is a lot larger.

Used at reputable companies

Some of you might be relieved to hear this, some might not care at all, but Rx and RxJS are used at a lot of reputable companies, which should put your mind at ease.


Thanks for staying along until the end!

Dear reader, I hope this was useful and you’ve learned a great deal about RxJS and Reactive Programming.

This is a rewrite of my talk on RxJS, which you can find at talk.philpl.com.