RxJS concepts for dummies

Naveen Chandupatla
5 min readJan 4, 2019

--

Photo by Chris Ried on Unsplash

Reactive programming

Reactive programming is a declarative programming paradigm concerned with data streams and the propagation of change. Consider below example.

b = 10, c = 20;
a$ = b+c;
b = 20, c = 30;
a$ = ?

In an imperative programming world, when this program executes, the value of a$ will always be 30. Even though the values of b and c are updated, there will be no effect on the value of a$. In the case of reactive programming, the value of a$ will be 50. Here, a$ is an always-on variable which is automatically updated whenever the values of b or c changes, without the program having to re-execute the statement. As you have noticed, Reactive programming entails a mental shift in the way you reason about your program’s behavior.

RxJS

RxJS is a JavaScript library for reactive programming using Observables, to make it easier to compose asynchronous or callback-based code. First started as an open source project, RxJS is currently evolved as a community-driven project owned by Ben Lesh from Netflix, sanctioned by Microsoft as RxJS 5.

Below are the main concepts to understand when learning RxJS
Observable, Observer, Subscription, Operator, Subject, Push vs Pull protocol.
In order to understand these concepts in an easy way, consider below grocery delivery service case as a metaphoric example.

Let's say that you (consumer) are interested to register for some of the kind of grocery delivery services (producer) like Amazon Fresh, HelloFresh or Instacart as you don’t want to go to grocery store manually to pick up items. The first step is to register in some kind of subscription for these services in order to automatically receive a stream of grocery boxes over time (weekly or biweekly). Upon receiving the grocery box, you usually perform an action like cooking or keeping them in the freezer for next day use. You can think of this process as reactive because you’re reacting to receiving a grocery box.

Now let's relate the same scenario to RxJS concepts.

In the above example, think of grocery delivery service as an observable (Producer). Observable is nothing but a stream of data separated over time.

Think of yourself as an Observer, who will be performing some action once you receive the package (data) from observable. Without a subscription, Observer cannot receive any packages(data) from Observable. This subscription is more like a contract between Observable and Observer. Once the subscription is canceled, Observer will stop receiving the data.

Now imagine, Operator is some kind of way to customize your Observable subscription. Using the operator, you can control how often you want to receive your packages, hold the subscription, add or remove items etc.

Think Subject as some kind of Family plan (Multiple Observers) where you can add family members to the subscription and they will also receive the packages (data) from same Observable.

Now, let's understand what are Pull and Push protocols using the same example. In Pull systems, the Consumer determines when it receives data from the Producer. The Producer itself is unaware of when the data will be delivered to the Consumer. It’s like you going to store to pick up the items.
Every Function or an Iterator in JavaScript is a Pull system. Whereas In Push systems, the Producer (Grocery Services) determines when to send data to the Consumer (You). The Consumer is unaware of when it will receive that data. Promises and Observables are the most common type of Push systems. But unlike Observables, Promises are unable to handle data sources that produce multiple values, like mouse movements or sequences of bytes in a file stream.

Hope you now have some high-level understanding of Rxjs key concepts. Let's dig into technical definitions and code.

Observable is a wrapper object on a data source with a stream.

import { Observable } from ‘rxjs’;
const observable = Observable.create(function (observer) {
observer.next(1);
observer.next(2);
setTimeout(() => {
observer.next(3);
observer.complete();
}, 1000);
});

The above program creates a custom Observable imported from the rxjs library which pushes the values 1 and 2 immediately (synchronously) when subscribed, and the value 3 after one second has passed since the subscribe call, then completes. When you run this program, no output will be displayed as no one is listening to this observer. As explained above, to invoke the Observable and see the values, we need a subscription to an observer.

Technically, Observer is just an object with the methods next(v), error(e), and complete(). Observable will call the Observer’s next(value) method to provide data. And similarly, calls the Observer’s error(err) to throw any error or Observer’s complete() method when the stream is done.

const subscription =observable.subscribe({
next: x => console.log(‘got value ‘ + x),
error: err => console.error(‘something wrong occurred: ‘ + err),
complete: () => console.log(‘done’),
});
console.log(‘just after subscribe’);
// got value 1
// got value 2
// just after subscribe
// got value 3

When you subscribe to an Observable, you get back a Subscription, which represents the ongoing execution. We need to explicitly call unsubscribe() to cancel the execution.

subscription.unsubscribe();

So far we have seen custom Observable using Rx.Observable.create. But, Rx.Observable class has many functions to create observable from different kind of data/streams such as events, event patterns, arrays, promises, single/multiple values, any kind of data structure/primitive data types and so on. Few examples of frequently used Observable.from and fromEvent are shown below.

import { Observable } from ‘rxjs’;Observable.from([10, 20, 30]).subscribe(x => console.log(x));const link = document.querySelector('#google');                      
Observable.fromEvent(link, 'click')
.map(event => event.currentTarget.getAttribute('href'))
.subscribe(console.log); //-> http://www.google.com

In the above example, map is called an operator. RxJS is mostly useful for its operators. An Operator is a function which creates a new Observable based on the current Observable. Operators can transform/customize the original data and pass it to the observer without any side effects.

Rx.Observable.from(<data-source>) 1
.operator1(…) 2
.operator2(…)
.operator3(…)
.subscribe(<process-output>); 3
• 1 Wraps a data source with a stream
• 2 Invokes a sequence of operations chained by the dot operator.
• 3 Processes the results

Check out this link to see most commonly used operators.

Finally, RxJS Subject is a special type of Observable that allows values to be multicasted to many Observers, whereas a plain “unicast Observable” only sends notifications to a single Observer. Consider below example with multiple observers subscribed to the same Subject.

import { Subject } from ‘rxjs’;
const subject = new Subject<number>();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});

const observable = Observable.from([1, 2]);
observable.subscribe(subject);

// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2

This behavior is different when multiple observers are subscribed to the same Observable as shown below.

const observable = Observable.from([1, 2]);
observable.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
observable.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
// Logs:
// observerA: 1
// observerA: 2
// observerB: 1
// observerB: 2

That’s it!. Hope this article helped you in understanding some basic concepts in Reactive programming and RxJS

Cheers

Naveen Chandupatla

--

--