RxJS: A Simple Introduction

Ross Bulat
Jun 22 · 12 min read

ReactiveX: Asynchronous, Event-Based Programming

Modern web apps of today are heavily event-driven, meaning they react to (and rely on) a range of external services and data sources to make them tick. One of the core libraries to make this happen is ReactiveX, also known as Reactive Extensions.

The ReactiveX library has been implemented in a range of languages, the most widely adopted of which being RxJS, the Javascript implementation. RxJS is particularly useful as it can be integrated within any Javascript application, whether server or client side, that has attributed to its huge growth in adoption.

This article aims to explain how RxJS works and how to leverage it in your projects in around a 10 minute read, covering the key concepts and terminology used along the way. Let’s get started.

The state of RxJS

The rsjx(npmjs) package, currently on version 6, is seeing huge popularity on 10.5 million weekly downloads at the time of writing. This popularity can be equated to a range of factors:

  • The package only has one dependency, tslib, providing Typescript support. The lack of dependencies means it is extremely easy to adopt RxJS as a dependency itself for other packages, allowing for a whole host of use cases including utility packages, API SDKs, webhook implementations and server side programs, as well as front-end event managers
  • The package is well maintained and is regularly updated, making it a reliable codebase. Version 6 (released in April 2018) was a huge update from the previous version that strengthened the library’s modularity and performance, showing the dev team are committed to long term success while not being averse to refactoring where it makes sense
  • Interestingly, RxJS makes a lot of sense for blockchain applications and the FinTech sector in general. Dapps in particular rely on continuous feedback from blockchains to process transaction data and events. RxJS is also a great fit with centralised services for managing transaction processing
  • Live chat and real-time communication apps, real-time collaboration apps, project management tools, AI tools with Tensorflow JS, and analytics frameworks are all great use cases for RxJS — these are critical tools for businesses that have one thing in common: asynchronous event driven interfaces that RxJS is very well suited to deliver

Needless to say, RxJS is in its prime, and the market is in high demand for an efficient reactive programming solution.

Let’s now explore the package in more detail and find out exactly how it works.


RxJS and the Observer Design Pattern

RxJS facilitates the observer pattern; a software design pattern that efficiently allows components of an application to react to certain incoming events or streams of data. This is also known as callback-based code, or event-driven code, where functions are triggered in response to incoming data.

In essence, what we have with RxJS is an efficient, asynchronous means of doing just this — allowing components to subscribe to these data streams and react to them via callback functions. This leads us onto the main concept adopted in RxJS; the concept of Observables.

Observables are attached to streams of data, and are responsible for processing that data and deliver it to Observers. Thus far we have used the term “component” as an object that subscribes to a data stream. The official term used in RxJS is an observer, or a subscriber to an Observable.

Note: If you are implementing RxJS in a component-based architecture such as a React or Angular app, you could indeed create observers for various components and have those components update their state as the Observable delivers data, all in an asynchronous manner that does not depend on other packages or external APIs. Very useful!

Now, Observables are very flexible with which types of data they can process — either a continuous stream of data from an API or websocket, a fixed amount of items from an array or object, arbitrary events such as click events, and more. Concretely, it is important to understand that Observables can facilitate just about any form of event or data stream, that are emitted as “items” of data.

Subscribing observers to Observables

On the other hand, observers are simply Javascript objects that subscribe to Observables via a subscribe method, and adopt three functions with which to react to the Observable at certain events:

  • onNext: Called when the Observable emits a new data item
  • onError: Called when an Observable fails to generate some expected data, or when another error occurs causing the Observable to terminate
  • onCompleted: Called once onNext has been called for the final time, and if no errors occurred

Given source as an Observable, an observer can be defined along with these three callback functions in the following way:

// subscribing to an Observable and defining callback functionsconst subscription = source.subscribe(
(x) => { console.log('Next: ' + x); },
(err) => { console.log('Error: ' + err); },
() => { console.log('Completed'); });

Note: It is required to adhere to the order of onNext, onError and onCompleted when defining these callback functions.

To avoid memory leaks, remember to unsubscribe to Observables too, via the unsubscribe method. In a React context, this can be done in a componentWillUnmount lifecycle method:

// remember to unsubscribe observerscomponentWillUnmount() {
source.unsubscribe();
}

This event driven pattern is a type of concurrent programming; observers are not blocking execution as they are waiting for Observables to emit items. Instead, observers act as a listener (or a “sentry” as a term the official docs like to use) that will act appropriately to incoming data items as they are emitted.

Note: If you require multiple observers subscribing to the same Observable, you will most likely experience unexpected behaviour. For such a scenario, Subjects are a better use case. Subjects are introduced further down.

The concept of cold and hot Observables

The concept of cold vs hot Observables is useful to determine when your Observables should start emitting items. A cold Observable will only start emitting items once it has gained a subscriber, whereas hot Observables will start emitting items even if no observers have subscribed to it.

The former approach (cold) is useful when past items are no longer necessary to process, whereas the latter approach (hot) allows observers to jump into a stream as they subscribe and be able to fetch previously emitted items — something that your application may well need — allowing components to “catch up” with some or all the data that has been emitted previous to the subscription.

Keep this concept in mind when we talk about Operators next.


Working with Operators

The observer to Observable framework is efficient and concurrent in nature, which is just what modern Javascript based apps need. However, if we stopped here, RxJS would simply be a good slightly extended implementation of the observer pattern. Perhaps its main strength though, lies in how Observables can manipulate the delivery of data, and the data itself, to observers. This is done via operators, the next major component of RxJS we will cover.

The X in ReactiveX: Operators

Operators are what make Observables interesting, and are attributed to the X (Extensions) in the ReactiveX name. Operators carry out transformations to either manipulate the data or delivery of data of an Observable. This section will not document every operator — that would be a tedious read as there are a library of them — but aims to give the reader an idea of what is possible with them, and how to efficiently study and implement them.

There are a wealth of operators documented on the official website; these are considered core operators, but you are free to develop your own too. The majority of these core operators have been implemented in Javascript for use with RxJS, giving us Javascript developers a lot to play with out of the box. It is unlikely you will require a custom operator unless you stumble across an edge case specific to your application.

Operators address the following question: How do you need to manipulate the data being emitted from your Observables?

Operators manipulate the Observable. We have operators such as defer, that halt the emission of data items until there are subscribers, interval for emitting bundles of data items at a set interval, merge for combining Observables into one data stream, and a lot more.

It is very common to use multiple operators together. As such, they can be chained into one expression. The following example chains 3 Operators together to emit 3 items from a data source every 500 milliseconds:

// take 3 items from data stream every 500 millisecondsvar source = Rx.Observable
.interval(500)
.timeInterval()
.take(3);

Note: Each operator has a separate page in the documentation with detailed explanations of what they do, with code examples for each of the implementations. Browsing through this valuable resource is the best way to familiarise yourself with the range of operators at your disposal.

Understanding Operators with marble diagrams

A helpful means of understanding how operators work is by referring to the corresponding marble diagrams that are used heavily throughout the documentation; they describe the behaviour of an Observable with operators applied.

In fact, a dedicated website, RX Marbles, has been set up where visitors can play with the interactive marble diagrams of each core operator to study the corresponding output.

A marble diagram essentially represents Observable timelines, the data items that are emitted onto them, and how those items are transformed through operators.

In its simplest form, a timeline is represented via a straight line with data items being emitted onto it. The following diagram represents items being emitted onto a timeline, with no operators being applied:

When the same item is manipulated or emitted a range of times, different shapes are used to represent that item in a timeline. A vertical straight line represents a successful completion of an Observable, whereas a cross represents an early termination due to an error:

When applying operators to an Observable, the timeline is either wrapped with that operator or linked below the timeline, depending on the nature of the operator:

The above toArray operator can be found here. This example takes each 5 data items emitted via the take operator, and groups them into an array. This is what it looks like in Javascript:

const source = Rx.Observable.timer(0, 1000)
.take(5)
.toArray();

const subscription = source.subscribe(
function (x) { console.log('Next: ' + x); },
function (err) { console.log('Error: ' + err); },
function () { console.log('Completed'); });

Operators vastly expand the capabilities of the observer pattern, including but not limited to:

  • Merging multiple Observable streams into one stream for observers to listen to (see the merge operator)
  • Being able to emit just a subset of data items at sub-sequent intervals. If a large number of items are being fed to an Observable and you only need a subset, or a sample of them, you can choose to emit only the last X, every Y milliseconds. This can be achieved via the sample and take operators
  • Using the map operator to transform items being emitted by applying a function to them
  • Filtering items by emitting only those who pass a predetermined test. Take a look at the filter operator to achieve this.
    Note: Filtering observables is an entire category of operators that are listed on the official docs
  • A range of aggregation options for grouping items, as well as conditional and boolean operators (see takewhile, all and contains as examples of these)
  • And much more

Familiarisation is the best tool for choosing the best operators for your use case; fully understanding the documented kmarble diagrams and Observable timelines greatly helps in making those decisions.

How to choose operators

Operators are split into various categories in the official Operators documentation; I recommend taking some time to browse through what is available, referring to both the marble diagrams and the written examples in RxJS for each one.

Note: Scroll further down the Operators page to find a “Decision Tree” — a thought trail you can apply to your use case to determine which operators to use. Although this may be quite useful for familiarisation purposes, I would opt more for browsing the available operators and getting feel of where they would be used within your own projects.

We are making great progress with our understanding of RxJS. Next, let’s briefly look at the concept of a Subject.

Working with Subjects

To unlock more of RxJS’s potential, Subjects are another important concept to understand. As we mentioned earlier in the article, Subjects can allow multiple observers to subscribe to them, and can act as an Observable themselves. This is just one characteristic of Subjects; in fact, they inherit Observable and observer characteristics, that make them a sort of hybrid between the two. Let’s examine them in more detail.

Subjects act as an Observable and observer

Subjects act as a proxy between Observables and observers. Imagine them being slotted between the Observable layer and observers layer to facilitate communication and delivery to observers.

Subjects can subscribe to one or more Observable and inherit their emissions, making them observers. As such, Subjects have their own timeline, and therefore act as an Observable themselves — the emissions they inherit then become available for observers to subscribe to, making Subjects Observables too. This allows a Subject to then emit its timeline to a multitude of observers interested in subscribing.

Learn-rxjs has a good metaphor for Subjects — you can think of Subjects as a single speaker talking at a microphone in a room full of people. Whereas standard Observables would initiate in a 1 on 1 conversation with an observer, a Subject can multi-cast, or deliver messages to many people.

Note: Subscribing a Subject to a cold Observable will trigger it to start emitting items.

In RxJS we have 4 types of Subject to play with. Let’s briefly cover them. Visit the corresponding xgrommx.github.io (great resource for RxJS example documentation) links to see examples of how they are used in RxJS:

  • AsyncSubject: An Async subject only emits the last item from the source Observable to subscribers, and to subsequent subscribers to the Subject
  • BehaviorSubject: A Behaviour Subject emits the most recent emitted item to any new subscribers, before emitting any subsequent items that are emitted. A default value can also be given if the Subject’s Observable(s) have not yet emitted an item
  • ReplaySubject: A Replay Subject will emit all previous source Observable emissions to new subscribers, and also pass newly emitted items to them.
  • Subject: A standard Subject with no replay behaviour and no initial value. New subscribers will receive emissions from the Subject’s source Observable(s)

Subjects are a great way to implement a multicasting solution for observers, with enough flexibility for just about all use cases when source Observables are used in conjunction with operators.

A note on Schedulers

Before concluding, it is worth mentioning the concept of Scheduling in ReactiveX. Although they do not play a huge role in the Javascript implementation, it’s definitely advisable to be aware of them.

Schedulers give us a means of defining when work is carried out, in terms of emitting Observable and Subject items, and are commonly passed as an Operator argument where they are supported. Essentially, Schedulers influence the timing on which tasks get executed.

Because of Javascript’s limited multi-threading support, there is a very limited scope for using Schedulers in RxJS. However, in other implementations, especially server-side (.NET, Java), Schedulers can be used to schedule workloads on dedicated threads as to optimise concurrency and efficiency. They are well suited for applications of a heavy workload in these cases.

Nonetheless, RxJS provides some Scheduler functionality, the basic implementation coming from the Rx.Scheduler class:

  • Schedule work immediately, but not recursively on the current thread with Rx.Scheduler.currentThread. This synchronous nature forces work to be queued if more is scheduled from the current task
  • Rx.Scheduler.immediate forces work to be carried out immediately and recursively on the current thread
  • Rx.Scheduler.timeout allows work to be scheduled via a timed callback — and is the only scheduler that allows workloads to be run at a future date
  • Rx.TestScheduler allows us to schedule workloads over time as your application is simulated in a test environment. TestScheduler is specifically designed for unit testing workloads from Observables and Subjects

Note: To learn more on RxJS Schedulers, this Stack Overflow post summarises RxJS Scheduler capabilities extremely concisely. The official documentation collates documentation for all Scheduler implementations.

Minimising Backpressure

On the subject of managing workloads, the concept of backpressure is also covered on the ReactiveX website, where there are a range of operators that can be used to prevent this from happening.

There may be times when items are being emitted faster than observers can process, or consume them. This is the definition of backpressure building up, and is usually detrimental to the performance and experience of an application.

This is a common problem in general programming, but can be minimised at the Operator level in RxJS.


In Summary

Reactive programming has taken off in a big way, as the internet has evolved into a huge ecosystem of dynamic, event driven applications.

The tooling and infrastructure built around web application development now allow apps to easily respond to real-time events, without having to know anything about the service which it is receiving those events from. RxJS is at the forefront of this movement.

This article served as an introductory into this evolving design paradigm, taking your applications to the next level of asynchronous, event-based behaviour, with the observer pattern as the foundational mechanic. We have by no means covered all that RxJS has to offer, but the reader should now be in a great position to comprehend all subject matters in the official documentation, and be able to start implementing their own RxJS solutions.

Ross Bulat

Written by

Director @ JKRB Investments

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade