Manage async code like a PRO with RxJS

Paweł Trysła
Callstack Engineers
6 min readJun 13, 2017

If you’ve ever struggled with managing async code or felt that Promises lacks essentials features like cancellation, better composition or in general are feature-poor, here comes the Saviour — Observables and RxJS.

What is an Observable?

Observable itself is a new type introduced by proposal-observable as a part of ECMAScript specification. Since this is a proposal, you can’t use it right now, without any Observable library.

In this article, we will focus on the most popular implementation of Observables — RxJS, so quick disclaimer:

Observables, operators and implementation details mentioned in this article, applies to RxJS only.

You should treat Observable the same as any other type like Array or Promise. You can instantiate it, modify, pass and whatever you like.

You can also treat Observables as a Set of (usually async) events, meaning Observable can emit multiple values (events) over time. This is one of the most significant differences between Promises — once Promise resolves, it’s done, at this point it’s just a wrapper around the data. You can attach then to it afterwards, but you will always get the same value. The second huge difference is that Observables are lazy/cold, whereas Promises are eager/hot. What it means, it that once Promise is created, it must either resolve with a value or throw an error. On the other hand Observables, won’t do anything unless you subscribe to them (we will talk about it later).

Despite of the differences, Observables and Promises also share some similar features. One of them are notification channels. With Promises, you have 2 channels: resolve and error. You can add handler function to resolve channel using then, which will get called, when Promise resolves with a value, and error handler to error channel using catch. Observables, also implements those 2 channels named next, error, but also have another one — complete. Next and error channels are analogous to Promise’s resolve and error. Complete channel is fired only when Observable is done emitting values. You can think of Promise’s resolve channel as a combination of next and complete — Promise can’t resolve multiple times so, when you get value it also completes.

Creation

Let’s see how we can create Observable instance:

Simple Observable creation

In this example, we use Observable.create factory provided by RxJS, which accepts a function. From within, we have access to observer object, on which we call next method passing values to be emitted by the Observable. In this case, we pass numbers from 0 to 4, each one within 1s interval.

$ is a common postfix to distinct a stream of data/events.

As I mention earlier, Observables are cold, thus you won’t see console.log being called, if you try to run this example.

Transformation and composition

RxJS gives you an extremely powerful toolset to transform or compose your Observables. Those tools are called operators. Remember, when you wanted to change every element of an array? You would use Array#map method. You can do the same with Observables!

Simple filter and map operations

The code above will first check if each emitted value from event$ fulfils the function we passed to filter operator. If so, it will map this value to a string. Every value, which doesn’t fulfil filter’s function will be discard. This highlights another useful feature — output Observable doesn’t have to emit the same amount of values as the input one does. In other words:

Input/output ratio of emitted values doesn’t have to be 1.

Note that we assigned results of those operations to a new variable. It’s because, each operator returns a new Observable — all operators are pure, same as Array#map and Array#filter.

Subscription

Up this point our Observable won’t do anything, since we haven’t yet subscribed to it. Let’s do it now:

Simple subscription with next handler only

As you can see, we need to call subscribe method on our Observable. It’ll make it hot, thus it will start emitting values. In console you will see:

0 is even
2 is even
4 is even

subscribe method also returns a subscription object, which you can use to unsubscribe (stop observable execution or free resources), by calling subscription.unsubscribe().

In this example, we’ve only passed a single function to subscribe method, however it’s worth knowing, that you can also pass error handler and complete handler:

Next, error and complete handlers

What exactly are the operators?

Operators are nothing more than a pure functions, which take input/current Observable and return a new Observable with applied modifications, thus they are chainable. Each operator can belong to 1 category out of 9. You can see all of them here, but the most commonly used are:

  • creation operators
  • transformation operators
  • filtering operators
  • combination operators
  • error handling operators

Each operator regardless of it’s category can be:

  • instance operator — uses Observable instance to which they’re bound, in other words, they use this keyword to refer to input Observable
  • static operators — aren’t bound to any specific Observable instance, instead of using this, they use first argument as an input Observable

Almost all creation operators are static, whereas the rest are instance operators. However, some of them such as concat or merge have both instance and static versions. You can read more on that matter here.

RxJS gives you ~120 operators to use, which is a overwhelming amount, but don’t worry most of them are just more specific variants of another or are used in very specific cases. Take for example map operator, if you know what it does, you also know what mapTo does — same as map, it projects value to another, but instead of accepting a function to get the new value, it uses the same single one every time.

Here’s the list of the most common operators, that you should get familiar with before diving deeper:

If you ever stuck, wondering which operator to use, here is extremely useful Q&A based guide to help you find the exact one.

Error handling and chains isolation

Let’s talk a bit more about error handling. Usually, while developing your app, you have to deal with 3rd party modules or need to fetch some data from remote server. Regardless of what you need to accomplish, the code can always throw an error, which will cause an Observable to close itself and run some teardown logic, thus no events will be able to go through it! Once the error is throw, the Observable is done aka dead.

Sometimes, this kind of behaviour is desired. In other cases, you need to handle errors properly. So how do you do that? Let’s first look at the example:

Naive implementation of error handling

As we can see, this Observable should make an Ajax call on specific event — buttonClicked. If the request fails, the thrown error will be handled by catch operator. Even though, the error’ed Observable (userInput$) is replaced by a new Observable.of('ajaxFailed'), the error will still cause it to close. No events will go through it anymore.

To fix this problem, we need chains isolation:

Proper implementation of error handling

We just moved catch into the inner chain created by ajax.getJSON. Now, if the error is raised, we replace it with a new Observable (in the inner chain), so in every case, the final output of the inner Observable chain will be a non-error’ed Observable.

Most of the time, you don’t want your Observable to die, so just isolate the error-prone chain from the main one, and add a catch to it.

Observables can be also synchronous

Although, Observables are extremely useful at managing asynchronous code, like WebSockets, events or user input, they can also be beneficial for synchronous tasks. Suppose, you have some big data set like an array. Observables might come in handy, where you need multiple Array#filter and Array#map operations to be chained together.

Consider the following example:

Modify large data set using array methods vs RxJS operators

Both variants will produce the same result, however Observables will be more performant and won’t create intermediate arrays. It’s due to the fact that, all RxJS operators will be applied in single iteration, whereas Array implementation will go through whole array to filter elements, then again over the intermediate array created by Array#filter method.

What’s next?

In case, you want to broaden your knowledge about RxJS and Observables, I strongly suggest to watch RxJS 5 Thinking Reactively and Advanced RxJS: State Management and Animations, both done by Ben Lesh or to read a RxJS documentation.

In the next article, we will talk how to use RxJS to manage side effects in Redux app.

--

--