Understanding RxJS Multicast Operators
RxJS multicast operators, better known as sharing operators, are probably the most complicated topic to understand in the jungle that is RxJS. In this article, I’ll try to clarify the subject by looking at it in a different way.
The key to really comprehend them is to understand the mechanism behind them, and the problem which they solve.
Let’s start from the beginning with a high-level explanation of RxJS building blocks:
Observables 101
In RxJS, observables are cold by default. What it means is that each time we call subscribe()
we execute the underlying producer function.
To understand this better, let’s create a basic Observable
implementation:
The Observable
class considers a single parameter — a subscription function. The subscription function is named as such because it’s invoked by our Observable
whenever someone calls subscribe()
.
Sometimes people refer to the subscription function as the “producer” function, because it also produces the values for the observer.
The subscription function receives an observer
. It’s simply a plain object with three optional methods: next()
, error()
, and complete()
.
The subscribe()
method takes an observer and calls the subscription function with it. Note that we’re not going to discuss the unsubscribe
process, but in a nutshell, each subscription should be disposable, returning an a function, which is responsible for any cleanup that should be performed once the subscription is no longer needed.
It’s as simple as that. Now, let’s create an observable that wraps the native XHR API:
The http implementation is pretty straightforward. We create a function that takes a URL, sends an HTTP request, and returns a new Observable
that will emit the response when it succeeds.
Now, based on our observable implementation, what do you think will happen when we subscribe to our http observable twice?
If you said we’ll see two outgoing requests in the network panel, you got it right. When we review the implementation of our Observable
class, we can clearly understand why this happens; Each subscriber invokes the subscription function, which in this case causes the XHR call to run twice — one for each subscriber.
Operators 101
Operators are essentially observables. The only difference is that, as the name suggests, operators operate on a source observable.
Building your own operator is as simple as writing a function:
The map()
function returns a function that takes a provided source observable
and returns a new observable, which is subscribed to the source.
Before we can use our operator, we need to create the pipe()
method:
Yes, it’s as easy as that — only one line of code! The pipe method takes an array of operators, loops over it, and each time invokes the next operator, passing it the result of the previous one as its source.
If we use this in our example, we get the following expression:
Let’s use our operator:
When we call subscribe()
we’re executing the map
Observable, which itself subscribes to the http
observable. When the http
source observable emits a new value, the value first reaches the map
subscription function. Then, after applying the projection function on the value, the map
Observable emits the value to the last subscription. This is called the observable chain.
If we subscribe to our observable twice this time, we’ll invoke each one of the observables in the chain twice:
However, what if we don’t want this behavior? What if we want to run the subscription function only once, regardless of how many subscribers it has?
For instance, what if we want to make a single HTTP call and share (i.e., multicast) the result with all the subscribers. That’s one example of an instance where we need to use a Subject
.
Subjects 101
A Subject
might seem like an intimidating entity in RxJS, but the truth is that it’s a fairly simple concept — a Subject is both an observable and an observer.
It’s an observable because it implements the subscribe()
method, and it’s also an observer because it implements the observer interface — next()
, error()
, and complete()
.
Let’s create a simple implementation of a Subject
:
A subject can act as a proxy between the source observable and many observers, making it possible for multiple observers to share the same observable execution.
Let’s modify our example to use a Subject
and see how it works:
When we call the subject subscribe()
method, it makes one simple operation: It pushes our observer into the observers’ array.
Then, because a Subject
also implements the observer pattern, it can subscribe to the source observable. When the source observable emits, it’ll call the subject next()
method which will result in a new notification for each one of the Subject
’s subscribers.
So now we’ll have a single execution of the original subscription function, and therefore we’ll see only one HTTP request in the network panel.
You Were Late to the Party: Late Subscribers
What happens if the source observable had already emitted a value before you subscribed?
We can’t demonstrate this with our previous example, as the http observable is async, and therefore we’re always subscribed to it before it emits the value.
Let’s quickly create the of
observable to help us show this case:
The of
observable gets an array of values and emits them synchronously one by one. Now let's subscribe to the subject after it subscribes to the of
observable:
Our subscribers don’t receive any value whatsoever. The reason for this is that our Subject
implementation doesn’t support late subscribers. When the of
observable emitted the values, the subscribers hadn't registered yet, which caused the values to be missed.
A real-world example from Angular can be when a source observable emits, but you have a component that at the time doesn’t exist on the page because of a falsy ngIf
. When it's eventually added to the page, it subscribes to the source, but it won't get any previously emitted value.
One way to solve it is by using a ShareReplay
Subject. Let’s create a naive implementation for this, and see how it works:
The concept is relatively simple. As the name suggests, ReplaySubject
is a special subject that “replays,” i.e., emit old values, to any new subscribers.
Each notification is broadcast to all subscribers and saved for any future observers, subject to the buffer size policy.
Let’s refactor our previous example and use a ReplaySubject
:
Now the result is different. Despite subscribing after the source emitted the values, we still get all of them.
We can summarize that ReplaySubject
functionality is to multicast the source values to all of its subscribers, and cache the values (based on the buffer size) to make it possible for late subscribers to receive them.
Before you continue, I recommend you to try to create a BehaviorSubject
by yourself. I will include the solution in the complete code gist.
Now it’s time to introduce the multicast RxJS operators, and hopefully, the examples we’ve looked at will help you understand them more easily.
Multicast Operators
Let’s explain them one by one:
Multicast and Connect
The multicast()
operator takes a Subject
and uses it to share the source execution:
The multicast returns what’s known as a ConnectableObservable
, which has a connect()
method. It has one simple job - subscribes to the source with the provided subject:
The connect()
method makes it possible to control when to start the execution of the source Observable. One crucial thing to remember is that in this case, to unsubscribe from the source, we need to unsubscribe from the connectable
subscription:
We’re not limited to a plain Subject
. We can replace it with any other type of subject, such as a ReplaySubject
, for example:
Based on the previous example, you can infer what’s happening here under the hood.
Keep in mind that when using multicast()
, we can either pass a Subject
or a factory function that returns a new Subject
. We can’t reuse a subject when it already saw complete; therefore, when we use a factory function, we make it reusable in case that the source completed, and we need to subscribe again.
RefCount
When we use the multicast()
operator, we’re responsible for calling connect()
to start the source observable execution. In addition to that, we’re also responsible for avoiding memory leaks by manually unsubscribing from the connectable subscription.
It would be more efficient and would help reduce errors, to make this process automatic. Luckily the good people in RxJS have already thought of this, and they've created the refCount
operator.
The refCount
operator is based on reference counting; It looks at the number of current subscribers. If that number changes from zero to one, it calls connect()
, i.e., subscribes to the source. If that number changes back to zero, it unsubscribes.
Note that when using refCount
, we get back a normal observable, instead of a ConnectableObservable
.
Publish and Its Variants
Because using multicast()
+ Subject
+ refCount()
is a typical pattern in RxJS, the team there have created shortcut operators to make it easier for us. Let’s go over the different variations we have:
publish()
— is a shortcut formulticast(() => new Subject())
:
publishBehavior()
— is a shortcut formulticast(new BehaviorSubject())
:
publishReplay(x)
— is a shortcut formulticast(() => new ReplaySubject(x))
:
publishLast()
— is a shortcut formulticast(new AsyncSubject())
:
share()
— is a shortcut formulticast(() => new Subject()) + refCount()
:
shareReplay(bufferSize)
— is a multicasting operator that uses aReplaySubject()
. It doesn’t internally use themulticast
operator itself, and as a result it always returns an observable, rather than aConnectableObservable
. It can be used either with arefCount
, or without it. Here are both variations:
When shareReplay is called with { refCount: false }
, it's the same as calling shareReplay(x)
.
In that case, reference counting isn’t activated. This means that as long as the source hasn’t complete or errored, it’ll still be subscribed to it, regardless whether there are any active subscribers or not. So all new subscribers will receive the last X buffered values.
shareReplay vs publishReplay + refCount
At first glance, shareReplay({ refCount: true, bufferSize: X })
looks identical to publishReplay(X) + refCount()
, but that’s not entirely accurate.
Let’s see what they share and where they differ:
They share the same refCount
behavior — subscribe and unsubscribe from the source based on updates to the number of subscribes. They also share the same behavior when the source completes; Any new subscriber will get the X last emitted values.
However, they behave differently when the source hasn’t completed. In that case, when we use publishReplay(X)
+ refCount()
, any new subscriber will get the last X emitted values from the ReplaySubject
’s buffer and will resubscribe again to the source using the same ReplaySubject
.
Conversely, if we look at the same situation with shareReplay({ refCount: true, bufferSize: 1 })
, we’ll not get the last X emitted values, since under the hood it creates a new ReplaySubject
instance, and uses it to resubscribe to the source. You can see it in the following examples:
Real-World Angular Examples
As we’ve learned, RxJS offers various sharing operators for us to use. Let’s look at a couple of examples where we can employ them.
Using Share
Let’s say that we have a component that needs to get some data from a source observable. It can be an HTTP call, a store, or any other implementation you have. Additionally, it needs to perform manipulations on the data, such as filtering, sorting, and so forth:
Now let’s say we need to add another component, one which only displays the first user. If we subscribe to the existing source as-is:
We’ll have two HTTP requests, and our operations, such as sorting and filtering, will run twice. To avoid this, we need to share the observable execution:
As we’ve learned, this creates a new Subject
that internally subscribes to the source. When the source emits, the subject will notify each one of the subscribers.
The solution is achieved because now, when we subscribe to firstUser$
, we’re subscribing to the internal subject, not directly to the source.
Using ShareReplay
shareReplay()
is mostly used when we need the ability to share, cache, and replay the X emitted values. A typical example is a singleton service that performs an HTTP request:
Here, it doesn’t matter how many components or future components are interested in the data from the posts
call; It will always perform a single HTTP request and return the data from the internal ReplaySubject
’s buffer.
You might encounter a case, where you want to cancel a request that hasn’t completed yet, and there are no active subscribers. In this case, you’ll need to use refCount
.
You can find the complete code here.
🚀 In Case You Missed It
- Akita: One of the leading state management libraries, used in countless production environments. Whether it’s entities arriving from the server or UI state data, Akita has custom-built stores, powerful tools, and tailor-made plugins, which all help to manage the data and negate the need for massive amounts of boilerplate code.
- Spectator: A library that runs as an additional layer on top of the Angular testing framework, that saves you from writing a ton of boilerplate. V4 just came out!
- And of course, Transloco: The Internationalization library Angular 😀
Follow me on Medium or Twitter to read more about Angular, Akita and JS!