Being RxJS Subjects Savvy

Param Singh
Frontend Weekly
Published in
9 min readMar 22, 2017

With Reactive Programming being a part and parcel of apps development these days, RxJS Observables finds a great a place in our apps. But due to the dearth of good documentation on the internet, I found it quite difficult to understand it in the first go. And not to mention the colossal collection of RxJS Operators. Adding to my bewilderment, there was the concept of Rx Subjects with very little documentation on github made it pretty hard to grasp. But now, since I think I have got a good hold on ’em, lemme share my cognizance with you.

Observables & Observers

Before learning what Subjects are, we gotta learn about their constituents namely Observables and Observers.

The Observer and Objects interfaces provide a generalized mechanism for push-based notification, also known as the observer design pattern. The Observable object represents the object that sends notifications (the provider); the Observer object represents the class that receives them (the observer).

Observables: Simply put, an observable is a stream of events than can be observed or listened to.

Analogy of Source & Listener Concept

Think of it as a person A calling you at one end called as “Source” and then when you pickup the phone or “Subscribe” to it, you are kinda “Observing” his talks on the other end called “Sink”. So, you are acting as an observer in this one way communication over a stream of media (a wire in this case). The communication can stop two ways. First, if the source person A stops talking and goes away or the subscriber/observer puts down the phone on the other end.

So, the person A is our “Observable” in this case and person B on the other end (sink) is the Observer.

Observer: Observer are the objects that gets subscribed to an Observable. Talking in RxJS context, an observable can have multiple observers at a time.

La Problemo

The problem (sometimes) with using plain Observables and Observers is that whenever a new observer subscribes to an Observable, it triggers or invokes an entirely new execution of the Observable which is unsolicited or undesirable in some cases.

Problem with normal observables

Note: Please note that the Observer has been transformed to an interface. Subscriber is the class that implements the Observer interface.

As we see from this fiddle, the events for ObserverB is entirely alienated from the one got by ObserverA. Both of them are invoking the Observable to execute specially for each of them.

What if we want to share the same execution of an already running instance of an Observable?

One approach that could be done looks like this

Creating a bridge/proxy Observable

What we have done here is the creation of a proxy or a middleman kinda thing that keeps track of all the subscribers or observers. Looks pretty much like a pub-sub design pattern implementation to me wherein all the registered listeners are informed or broadcasted whenever a piece of information is published by the source.

If you carefully notice the addObserver method or BridgeObserver class. All that it’s doing is adding or subscribing a new observer with it. So we can pretty much replace it with the `subscribe`. Right? Like this.

bridgeObserver.subscribe(observerA);

While at the same time, we’re also doing

observable.subscribe(bridgeObserver);

In the former snippet, we’re subscribing to our bridgeObserver like it’s an Observable and in the latter one, we’re passing it to the subscription handler of the Observable.

Hence, we can conclude that bridgeObserver is both Observer and Observable.

It’s a pretty common use case. So, instead of writing this much verbose code every time we needed it, RxJS provides us with something called as Subjects.

Subject

The Subject class inherits both Observable and Observer, in the sense that it is both an observer and an observable.

We can create a subject by simply instantiating it like this

let subject = new Rx.Subject();

By creating a subject, we can subscribe to it to listen the events and also on the other hand, publish the events from the same object.

Subject Api Usage

As you can see we’re imperatively emitting events on our Subject stream by calling next or error methods on the subject object itself.

In the following example, we’re passing subject inside the subscription method of an inbuilt interval observable Rx.Observable.interval since it can be treated as an observer. The events are re-emitted from it and when we subscribe to it one after the other, we get the desired results.

Converting an Observable to Subject

Types of Subjects

There exists three kinds of Rx Subjects as follows

ReplaySubject

ReplaySubject stores the history of values it has published so far ever since the first subscriber has joined it and when a new subscriber joins it at a later stage of time, it “replays” all the past values for that subscriber. Hence the name, “ReplaySubject”.

ReplaySubject Demo

BehaviorSubject

This is a special kind of Subject and finds wide use maybe because of its structure and handy method getValue. It’s quite similar to ReplaySubject but instead of returning the entire history of past values to a later subscribed observer, it gives only the last value on the stream. Hence, it becomes mandatory to provide a seed or initial value to it while instantiating it like this.

let subject = new Rx.BehaviorSubject(0);

It also provides a nice function getValue to get the existing value on the stream. If none of the observers have subscribed, it returns the initial value as passed above, otherwise it returns the latest value available on the stream at that time.

BehaviorSubject Demo

AsyncSubject

Pretty similar to above discussed subjects. It also provides the last value to the later subscribed observer but only does it when the Observable is complete or done emitting events. Unlike the BehaviorSubject which doesn’t provide any value to the observer subscribed post complete, AsyncSubject does the needful.

AsyncSubject Demo

Cold vs Hot Observable

There exists two kinds of Observables:

Cold Observables

These are the normal ones that we’ve discussed in the very beginning. Every new subscriber that joins in invokes the creation of new instance of the Observable and gets new set of separate events exclusively for itself. All the subscribers have the observable emitting events for each one of them individually. Hence, there’s no events sharing among them.

Eg: Videos uploaded on Youtube can be accessed by any subscriber any time. It’s execution will be controlled or dictated by the subscriber.

Hot Observables

Unlike cold observables, hot ones share a single execution of the Observable and all the subscribers get to observe the same events through the execution of the observable.

Eg: A live match being broadcasted on Youtube. It’ll be same for every one watching it.

So, you can say the problem that we tried to tackle with Subjects was the concept of hot observable, then why there’s a need of Subjects.

One difference between observables and subjects is that Observables are State Agnostic while subjects are State Aware.

The Multicast Operator

While demonstrating subjects, in order to convert an inbuilt observable like interval to Subject, we did

let subject = Rx.Subject();
observable.subscribe(subject);

and then

subject.subscribe(observerA);

We can accomplish the same behaviour using observable only by using the multicast operator.

let connectableObservable = Rx.Observable.interval(1000)
.take(5)
.multicast(new Rx.Subject())
// or simply .publish()

It returns a special kind of observable called ConnectableObservable.

A Connectable Observable resembles an ordinary Observable, except that it does not begin emitting items when it is subscribed to, but only when its connect() method is called. In this way you can wait for all intended Subscribers to subscribe to the Observable before the Observable begins emitting items.

So, continuing our previous example snippet, if we do

connectableObservable.subscribe(observerA);

It won’t trigger the execution of the observable like it happens normally. The execution of a connectableObservable will happen no sooner than connect() method is called on it.

let subscription = connectableObservable.connect()

Now, it’ll start the execution of the observable. Also, the connect() method returns a subscription object which we can use to unsubscribe it.

Hot Observable using publish()

refCount

If we want to automatically connect when there’s at least one subscriber connected to the observable and unsubscribe when there’s no subscriber. We can use the refCount() method of connectable observable. It basically helps you convert a connectable observable to a normal one

The RefCount operator automates the process of connecting to and disconnecting from a connectable Observable.

Below is how a proper “hot” observable looks like.

let connectableObservable = Rx.Observable.interval(1000)
.take(5)
.publish()
.refCount()

You can relate it with a scenario wherein a band is playing music when there’s some audience listening to it. And it makes sense for the orchestra or band to stop playing when there’s no one in the audience. This is exactly what refCount() does automatically for us.

Using multicast or publish operator, so far we’re still not able to solve that problem that Subjects did, i.e being Stateful. The newly adding observers have no information of the events emitted in the past before their subscription. RxJS provides very handy methods for that too discussed as follows.

Publish Functions

The normal publish() function we saw above was just an alias for .multicast(new Rx.Subject()). If we want to achieve more control over the type of subject being passed, we can use the following alias methods to achieve the same.

publishReplay

Short for .multicast(new Rx.ReplaySubject()), it can be used to create a replay subject nature out of a given observable.

let observable = source.publishReplay().refCount();

publishBehavior

Alias for publishValue() in RxJS 4 and short for .multicast(new Rx.BehaviorSubject(<seedValue>))

let observable = source.publishBehavior(-1).refCount();

publishLast

Short for .multicast(new Rx.AsyncSubject())

let observable = source.publishLast().refCount();

Please have a look at their detailed docs on github for more depth.

When to use Subjects over Observables?

The most awaited and confusing questions to all the Rx devs. As we have seen above that we can pretty much achieve the same functionality of a Subject by using a hot Observable in its place with some operators like publish, refCount applied on it. So, which one to choose over the other. Both seem to accomplish the same functionality. Right?

Here’s a depth discussion on this debate, but let me condense it down for you.

You need to consider two things before deciding between them.

  1. Check whether your source of events (observable as we call) is “external” or “internal”. External is the one which is inbuilt or provided by the library itself like interval as we have been using throughout. Internal is the one that you create like maybe some custom event, listening to some UI interaction by the user, file download or progress event etc. For the latter case use Subjects and for the former one use Observables. Simple!
  2. The second criteria is the “temperature” of your observable, whether hot or cold. If you want all your subscribers to observe the same events, use Subject or a hot observable (publish). Please note that the publish operators on Observables also uses subjects internally. And the other case where, you want a separate observable invocation for every new subscribing observer, use simply a cold or normal observable.

Conclusion

So, now you know better about that “fine line” that’s there between Rx Observables and Subjects and why you should favour one over the other based on the scenario. It’s no silver bullet that can be used in all the cases. Your decision should be biased enough towards your scenario and use case. The one thing I like the most about Subjects is that it helps you imperatively control the stream by publishing values to it from even distant places in your code. But most of the time, people seem to misuse it. So, next time you use Subjects, do decide judiciously based on the criteria discussed above.

Kindly leave your comments and feedback.

Thanks!

--

--

Param Singh
Frontend Weekly

Senior Front-end Engineer at AWS, Ex-Revolut, Flipkart. JS enthusiast. Cynophile. Environmentalist