RxJS: Managing Operator State
When pipeable operators were introduced in RxJS version 5.5, writing user-land operators became much simpler.
A pipeable operator is a higher-order function: a function that returns another function. And the function that is returned takes an observable and returns an observable. So, to create an operator, you don’t have to subclass
Subscriber. You just write a function.
However, there are situations in which you need to take some extra care. In particular, you need to be careful whenever your operator stores internal state.
Let’s look at an example: a
debug operator that logs received values and their indices to the console.
Our operator will need to maintain some internal state: the index — which will be incremented each time a
next notification is received. A naive approach would be to store that state within the operator function. Like this:
However, this approach has a couple of problems that will effect surprising behaviours and hard-to-find bugs.
The first problem is that our operator is not referentially transparent. A function is referentially transparent when it’s possible to replace the function call with the value that would be returned without changing the program’s behaviour.
Let’s look at what happens when our operator’s returned value is used to compose several observables:
The program’s output is:
Well, that’s surprising. The index didn’t start at zero for the second observable.
The second problem is that our operator will behave sensibly only when the observable it returns is subscribed to once.
Let’s look at what happens when multiple subscriptions are made to an observable composed from our
The program’s output is:
Again, it’s the same surprising behaviour: the index didn’t start at zero for the second subscription.
So how can these problems be fixed?
Both of the problems can be solved by storing the state on a per-subscription basis. And there are a number of ways this can be achieved.
The first is to use the
Observable constructor to create the observable that our operator will return. If the
index variable is moved into the function passed to the constructor, the index will be stored on a per-subscription basis. Like this:
The second way — which is my preferred way — of implementing per-subscription state is to use the
defer observable creator. If the
index variable is moved into the factory function passed to
defer, it will be stored on a per-subscription basis. Like this:
Another — more complicated — way of implementing per-subscription state is to use the
scan maintains its own per-subscription state which intialised with the
seed and made available via the
accumulator. The index can be stored within
scan like this:
If the programs used to demonstrate the problems with the naive implementation are run using any of the implementations in which per-subscription state is maintained, the following output will be produced:
Which is just what you’d expect: no surprises.