Reactive Programming in JavaScript— RxJS

Kingsley Tan
Mar 15 · 6 min read

As Javascript developers, we implement asynchronous programming (async/await) in our codes. Constantly processing user inputs, call APIs to get data, or simultaneously run heavy-loaded computations might cause our servers go down. However, these tasks are essential to programming and required in real life.

To a simple code, it is common to use events in Javascript or jQuery libraries. When there is no appropriate way to expand the simple code and to solve asynchronous issues, in order to realise modern web users, the ultimate code will become very messy and difficult to maintain. The main issue here is because asynchronous programming is hard to manage, but RxJS can help on this.

Reactive Programming in JavaScript — RxJS

Problems that RxJS solves

Constantly having response is important for any application in modern programming, which means when services are down during processing user inputs or getting external data from AJAX are totally not acceptable. The main problem is due to I/O operations are much slower than CPU, and this happens in both front-end and back-end systems.

When using Javascript in front-end, it is common to spawn a lot of connections in web browser to do processing, at the same time, a bunch of callbacks to handle the promises. This way is actually going against the programming practices because you do know when the processes will complete, and you can’t control them. Although callbacks are good in small-scale applications, but not in complex situations. Callback needs to handle user data and remote HTTP calls simultaneously, which might eventually lead to “callback hell”.


Promises in Javascript ES6

makeHttpCall('/items',   items => {     for (itemId of items) {       makeHttpCall(`/items/${itemId}/info`,         itemInfo => {           makeHttpCall(`/items/${itemInfo.pic}`,             img => {               showImg(img);           });        });      }});beginUiRendering();

The code above has following issue:-

  1. Difficult to understand
  2. For-loop cannot handle asynchronous callbacks nicely, which may cause some unknown bugs.

To solve these, the concept of “Promises” is introduced in Javascript ES6. Promises help developers to provide a smooth way to catch the timing, and handle it with a callback, i.e. then().

To improve the code above, we can rewrite as below:-

makeHttpCall('/items')   .then(itemId => makeHttpCall(`/items/${itemId}/info`))   .then(itemInfo => makeHttpCall(`/items/${itemInfo}.pic}`))   .then(showImg);

The second snippet has obviously improved and much easier to understand. Despite “promises” is efficient in processing single data (or single error), but what if users keep triggering the same function non-stop? “Promises” does not delete, distribute, retry events, hence the efficiency will drop in this case. Let see what RxJS can do for us.


So, what is RxJS?

RxJS is a Javascript library to solve asynchronous problems, which was extended from Reactive Extension library, optimised the application of observer pattern into functional programming. Observer pattern is a proven pattern where an object, called the subject, maintains a list of its dependents, called observers, and notifies them automatically of any state changes.

With functional programming, for instance declarative programming, immutable data structure, method chaining etc., you may say goodbye to callback functions now.

If you are familiar with functional programming, RxJS is similar like asynchronous version of Underscore.js.

Streams in RxJS

Stream is a series of events happened in a period of time. Stream can be used to handle any type of events, including mouse clicks, keyboard inputs, internet byte data etc. If you deem Stream as a variable, it will react to changes from the perspective of data.

Variable and stream are dynamic, but with some differences. Let’s look at a simple example below here:-

var a = 2;var b = 4;var c = a + b;console.log(c); //-> 6a = 10;  // reassign aconsole.log(c); //-> still 6

Despite variable a is reassigned as 10, but this method is forcing the dependent variable to be immutable. This is the main difference. When an event is called, change is initiated from event publisher(producer) and pass on to event subscriber (consumer). If we assume variables as streams, the example will become:-

var A$ = 2;var B$ = 4;var C$ = A$ + B$;console.log(C$); //-> 6A$ = 10;console.log(C$); //->  16

From the example above, we can redefine the dynamic behaviour of variables by introducing streams. (‘$’ is just to differentiate the 2 examples above). Variable C$ applies addition operation (+) onto A$ and B$, when a new value is assigned to variable A$, variable C$ immediately responded to become 16. Current example is just a very simple one, the main idea here is to explain how a variable changes in event streams.

Now, we have a brief picture of RxJS.

Observable in RxJS

The definition of observable is probably the most important part in RxJS. It is used in handling pieces of events like clicking event, keyboard event, cursor event, integer, string or array, which it helps you control how the events flow through. The simplest form of observable object is single variable, for example:

var streamA$ = Rx.Observable.of(2);

Reusing the example above with real RxJS commands and new API:-

const streamA$ = Rx.Observable.of(2);const streamB$ = Rx.Observable.of(4);const streamC$ = Rx.Observable.concat(streamA$, streamB$)     .reduce((x, y) => x  + y);streamC$.subscribe(console.log); //prints 6

The final result gives the value of “6”. In fact, once a stream variable is declared can never be reassigned with new value (like the example in previous section). Hence, you need to declare new stream variable in this case. Since stream variable is an immutable data type, so to make it clearer, we use const in ES6 in our codes:-

const streamA$ = Rx.Observable.of(2, 10)...streamC$.subscribe(console.log); //prints 1

Now, subscribing to streamC$ will get value of “16”. Like mentioned before, stream is a series of event in a period of time.

Observables to allow handling asynchronous events as collections

Observable Methods

There are plenty of methods, below are some examples of observable methods:-

Stream programming has different way in handling event trigger, where observable is lazy computation, i.e. “calling” or “subscribing” is an isolated operation: two function calls trigger two separate side effects, and two Observable subscribes trigger two separate side effects.

Observer

Observer is the consumer of data, it processes and responds to values passed by observables. The API for observer is simple, next function is used to iterate the sequence. When an event is pushed to observable, this function will be called.

In the previous example, streamC$.subscribe(console.log) is the simplified version which actually using the underlying concept of observer. So, how to create observer?

const observer = Rx.Observer.create(    function next(val) {       console.log(val);    },
function error(err) { ; // Executed when error occurs }, function complete() { ; // Executed after event completes });

Observer also has error API which will send signal when error occurs during execution. All methods in observer are optional and available to subscribe (we will cover subscribe method in next section). In next method, you need to put the corresponding logics into it, for example: write an article, print out the article, reflect into DOM etc.


Now that we have covered the basic concept of reactive programming including stream, observable and observer. Generally speaking, reactive programming is mainly to generate event-based asynchronous application via observable sequences. This concept is actually similar to the application that most of us familiar with — Microsoft Excel. When a sum is referring to, say, E1:E5; by changing the value in any of the cell, the sum will be updated after you press enter. Sounds simple?

Hope you enjoy reading this and we shall continue with more concepts in next part. Thank you!

Kingsley Tan

Written by

Head Of Tech Consulting at Revenue Monster Sdn. Bhd.

More From Medium

More from Kingsley Tan

More from Kingsley Tan

Understand reflect in Go

Also tagged ES6

Also tagged Rxjs

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade