On The Subject Of Subjects (in RxJS)

Subjects in RxJS are often misunderstood. Because they allow you to imperatively push values into an observable stream, people tend to abuse Subjects when they’re not quite sure how to make an Observable out of something. The pattern looks a little like this…

// What people usually first do with Subjects when they find them
// (TRY NOT TO DO THIS)
const subject = new Subject();
button.addEventListener(‘click’, () => subject.next('click');
subject.subscribe(x => console.log(x));

While this is helpful for people new to RxJS (and in that capacity totally fine), it’s not really the “Rx way” to handle things like this. Ideally you’d wrap your event registration in an Observable that can set it up and tear it down. Which looks more like this:

// This is better, but use Observable.fromEvent(button, 'click')
// instead, please.
const clicks = new Observable(observer => {
const handler = (e) => observer.next(e);
button.addEventListener('click', handler);
return () => button.removeEventListener('click', handler);
});

Why show this when it has nothing to do with Subjects? Well, for one thing, it shows why you don’t always need to use a Subject, and for another thing, there’s a subject hidden in here… sort of. The thing to note here, is that the Observable is wrapping the registering of the handler on the button via addEventListener, which itself is a subject. …at least per the “Gang Of FourObserver Pattern.

The Observer Pattern

As you may know, RxJS is mostly about Observables and Observers… but it’s also about Subjects. While observables aren’t something you’ll find in the GoF’s Design Patterns, Subjects and Observers are the meat-and-potatoes of the Observer Pattern.

(Source: Wikipedia)

The pattern is pretty straight forward. Observers are a class with a notification method on it, and Subject is a class with a means to add or remove an observer to/from a list of internal observers, and a method to notify that list of observers.

Subjects in RxJS aren’t much different. When you call subscribe with an observer on an Rx Subject, it will add that observer to an internal list of observers. Likewise, if you call subscribe with one to three functions, it wraps them in an observer, and adds it to its list of observers. When you call next(value) on the Subject, it will loop through its list of observers and forward that value along to their next methods. It does the same thing for error and complete. To remove your observer from the subject’s list of observers, you simply call unsubscribe on the subscription returned when you added the observer to the list.

const subject = new Subject();
// add observer1 to the list of observers
const sub1 = subject.subscribe(observer1);
// add observer2 to the list of observers
const sub2 = subject.subscribe(observer2);
// notify all observers in the list with "hi there"
subject.next('hi there');
// remove observer1 from the list
sub1.unsubscribe();

Subject Compared To Observable

Effectively, RxJS Subjects different take on the GoF Observer Pattern Subjects, but their API duck-types as an Observable. In fact, in RxJS, Subjects even inherit from Observable. The advantage here is that all Subjects then have the same operators and methods available to them as Observables do.

Probably a more important distinction between Subject and Observable is that a Subject has state, it keeps a list of observers. On the other hand, an Observable is really just a function that sets up observation.

While Subjects are Observables, Subjects also implement an Observer interface. That is to say, they have next, error, and complete methods. These methods are used to notify their counterparts on observers in the subject’s internal observers list. This means a subject can be used as an observer to subscribe to any observable.

// To "share" the observable tick$ with two observers,
// observer1 and observer2, we can pipe all notifications
// through a Subject, like so
const tick$ = Observable.interval(1000);
const subject = new Subject();
subject.subscribe(observer1);
subject.subscribe(observer2);
tick$.subscribe(subject);

The example above is “multicasting” the observable tick$ to two observers: observer1 and observer2. This is actually what almost all multicasting operators in RxJS do internally. For example publish, publishReplay, multicast, share, etc. Really, this is the primary use case for Subjects in RxJS.

Subjects Are Not Reusable

In RxJS, Subjects cannot be reused. That is to say, when a Subject completes or errors, it can no longer be used. If you try to next on a Subject that is closed due to it’s complete or error method being called, it will silently ignore the notification. If you want the Subject to loudly and angrily error when you next to it after it’s done being useful, you can call unsubscribe directly on the subject instance itself.

// The death of a Subject
const subject = new Subject();
subject.subscribe(x => console.log(x));
subject.next(1);    // 1
subject.next(2); // 2
subject.complete();
subject.next(3); // silently ignored
subject.unsubscribe();
subject.next(4); // Unhandled ObjectUnsubscribedError

Gotchas in RxJS

But it comes with some confusing pain points in current versions of RxJS. Since Rx observables do not “trap” errors, we can run into some strange behavior here. Error “trapping” is a behavior I myself have derided Promises for implementing, but in multicast scenarios it may be the right move. What I mean when I say Rx observable does not “trap” errors is basically that when an error percolates to the end of the observer chain, if the error is unhandled, it will be re-thrown.

// Demonstrating re-throwing for lack of an error handler
const badObservable = Observable.throw(new Error('haha'));
try {
badObservable.subscribe({
next: x => console.log(x),
error: null,
complete: () => console.log('done')
});
} catch (err) {
console.error(err); // logs our Error: "haha"
}

Now let’s think of that behavior in terms of what happens when you’re looping over a list of observers and notifying them (as subject does)

for (let observer of observers) {
observer.next('notify'); // What happens if this call throws??
}
// HINT: It's going to error and break the loop
// NOTE: Okay, that was more than a HINT.

Given that a number of operators are processed synchronously, (map, filter, scan et al), if you have an error thrown in one of those, or any other synchronous operation, downstream from a multicast (which is using a Subject to loop over a list of observers and notify them), you can get some spooky behavior:

// This is going to behave strangely
const source$ = Observable.interval(1000).share();
const mapped$ = source$.map(x => {
if (x === 1) {
throw new Error('oops');
}
return x;
});
source$.subscribe(x => console.log('A', x));
mapped$.subscribe(x => console.log('B', x));
source$.subscribe(x => console.log('C', x));
// "A" 0
// "B" 0
// "C" 0
// "A" 1
// Uncaught Error: "oops"

In the example above, most users would expect A’s and C’s to keep notifying. It’s understandable that the second B’s observable dies, it had an error, but it’s rather confusing that the other streams and the source stream die. It shouldn’t be the case that an arbitrary third party can kill your shared observable stream and therefor unknown numbers of sibling streams. This is a leaky abstraction, and we need to fix it in upcoming versions of RxJS.

Working around the above scenario in the interim is easy thanks to schedulers. You can use observeOn after your multicast and you’ll get around this problem because errors will no longer be thrown synchronously.

const source$ = Observable.interval(1000)
.share()
.observeOn(Rx.Scheduler.asap); // magic here
const mapped$ = source$.map(x => {
if (x === 1) {
throw new Error('oops');
}
return x;
});
source$.subscribe(x => console.log('A', x));
mapped$.subscribe(x => console.log('B', x));
source$.subscribe(x => console.log('C', x));
// "A" 0
// "B" 0
// "C" 0
// "A" 1
// Uncaught Error: "oops"
// "C" 1
// "A" 2
// "C" 2
// "A" 3
// "C" 3
// ... etc

Another workaround, that’s a little more performant if you can manage it, is to simply add an error handler to all of your subscriptions.

const source$ = Observable.interval(1000)
.share()
.observeOn(Rx.Scheduler.asap); // magic here
const mapped$ = source$.map(x => {
if (x === 1) {
throw new Error('oops');
}
return x;
});
source$.subscribe(x => console.log('A', x));
mapped$.subscribe(
x => console.log('B', x),
err => console.log('Error handled: ' + err.message)
);
source$.subscribe(x => console.log('C', x));
// "A" 0
// "B" 0
// "C" 0
// "A" 1
// "Error handled: oops"
// "C" 1
// "A" 2
// "C" 2
// "A" 3
// "C" 3
// ... etc

The Future Of Observable

The newer incarnation of the TC39 Observable proposal, not including the CancelToken business, which is an entire article by itself, is likely going to get around this behavior by “trapping” the error if there is no error handler. That is to say, it’s not going to re-throw errors that make it to the end of the observer chain. In future versions of RxJS I think we’re going to do the same thing, because it’s the right thing to do. The issue is open for debate, of course, but it’s unlikely to meet much resistance in my opinion.

Summary

  1. Subjects are both observer and observable
  2. Subjects “multicast” to an internal list of observers
  3. Observables are just functions that set up observation
  4. Observables currently don’t trap errors but they should
  5. Errors thrown synchronously downstream from a Subject can kill the Subject
  6. You can use an error handler or observeOn to fix the problem in #4
  7. I was wrong about Promise error trapping. It’s a good idea, because promises are multicast.*
  8. Future versions of RxJS are likely to trap errors.
  • Although maybe not totally necessary, as promises are always async. (shrug)
A little about me: I am the lead author of RxJS 5 and I run workshops on reactive programming with RxJS at RxWorkshop.com