Creating Custom Operators in RxJS
Operators are one of the building blocks of RxJS. The library comes with many operators, which can be used to deal with almost every situation we may encounter, but there are times when it can be helpful to create our own.
In this article, we’re going to learn different ways to create our own operators; But before we start, let’s explain what, in fact, is an operator.
This may surprise you, but an operator is just an observable. What distinguishes operators from other observables? Operators are observables that, as the name suggests, operate on a source observable.
Let’s take the following example:
We have interval
function which returns an observable. Using that observable as our source, we employ the pipe()
method, passing it the map
function, which returns an operator. Let’s see the implementation of the pipe()
method so that we can get a better understanding of how it works:
The pipe
method takes an array of operators, loops over it, and each time invokes the next operator, passing it the result of the previous one as its source. If we use this in our example, we’ll get the following expression:
map(interval): Observable
From examining this code, we can learn that building an operator is as simple as writing a function which takes a source observable as input, and returns an observable:
Congratulations, we’ve just created an operator! Yes, it’s useless, as it returns the same observable that it receives, but hey, it’s an operator nonetheless:
Now, let’s stop for a second and talk about a common misconception about this topic. Looking at our example, you may hear people describing it as being “subscribed to the interval observable”. That’s not accurate; You’re always subscribed to the last operator in the chain (i.e., the last item on the operators’ list).
So in this example, we’re subscribed to the observable that myOperator
function returns. Let me demonstrate this. Let’s return a different observable from our operator:
If we re-run our example, all we’ll ever get is one 🦄. That’s because we receive the source observable (in our case the one generated by an interval
), but we don’t do anything with it (e.g subscribe to it).
This proves that we’re subscribed to the observable that returns from myOperator
, and not the one that returns from the interval
.
That leads me to the next subject — the observable chain. Let’s take our first example, an interval with a map:
If, as we’ve noted, we’re actually subscribed to the observable that returns from the map()
operator, how do we reach the observable that returns from the interval()
?
The answer is simple — we don’t! Each operator receives its source observable, which is the one preceding it, and that operator is the one that (in most cases) subscribes to it.
When we call subscribe()
we’re executing the map
observable, which itself subscribes to the interval
observable. Whenever the interval
source observable emits a new value, the value reaches the map
subscription function.
Then, after applying the map
’s projection function to the value, the map
observable emits the result of that function to the last subscription in the observable chain.
Now that we understand how that mechanism works, let’s build some operators.
Creating a filterNil Operator
A common use case is one where we have a source observable that can emit a null
or undefined
value, and we want to ignore those values. Let’s create an operator that manages this task for us, which we’ll be able to apply to any source:
In this case, unlike the one before, we create a function which returns an operator. The reason is that this makes our code scalable, since it can be easily extended to receive arguments in the future 🦊
Once we receive a source, we subscribe to it. When the source emits, we check whether the value is undefined
or null
— if that’s the case, we ignore it; Otherwise, we pass the value on to the next subscriber.
The same goes for the error and complete notifications. We pass them through the chain; otherwise, they’ll not be handled properly by the rest of the chain. If we run the following example, we’ll see that we don’t get a notification for the initial emission:
The more observant among you may have noticed an important issue — we’ve created a memory leak. Remember that each observable returns an unsubscription function, responsible for performing any required cleanup. This function is called whenever someone calls unsubscribe()
. In our example, we’ve subscribed to the source, but we’ve never unsubscribed from it.
This means that the interval
source will keep running even after we’ve called the unsubscribe
method for the resulting observable.
So, how can we fix this? The solution is as simple as calling the source’s unsubscribe
method:
There is a shorter way in our case — we can directly return the result of the source subscription call:
Now when we call unsubscribe()
on the result, it actually calls unsubscribe()
on the source observable.
Creating Operators from Existing Operators
Now that we’ve learned the fundamentals of creating an operator and how easy it is, we can use the short, and in most cases, the recommended way of creating them — building them from the existing ones.
Let’s re-create the filterNil
operator using, as you might have guessed, the filter
operator:
That’s neat, but we can take it one step further. Since pipeable operators return functions, which as we’ve learned, receive a source observable, we can make filterNil
even shorter:
Let’s wrap things up with some useful operators that we can use on a daily basis:
debug — Creating Beautiful Logs for debugging
Let’s create an operator that uses the console log API and logs each one of the notifications with nice colors:
Let’s see it in action:
optionalDebounce — to Debounce or Not to Debounce
When building components, there are times when we expose an input
, allowing the user to pass an optional debounce value to be used for some action. If the user doesn’t pass a value, we can skip using the debounceTime
operator:
filterKey — Get Just the Keyboard Events You Want
This operator is for times when we need to filter specific keys while listening to keyboard events:
polling — Time to Ask the Server for Data Again!
We sometimes need to perform polling from our backend. We can create a custom operator for this that’ll make it more readable and seamless for those who aren’t super familiar with RxJS:
The polling
operator takes a stream, a period that represents the polling interval, and an optional initial delay time, which by default is set to 0.
🚀 In Case You Missed It
Here are a few of my open source projects:
- Akita: State Management Tailored-Made for JS Applications
- Spectator: A Powerful Tool to Simplify Your Angular Tests
- Transloco: The Internationalization library Angular
- Cachew — A flexible and straightforward library that caches HTTP requests in Angular
- Forms Manger: The Foundation for Proper Form Management in Angular
Follow me on Medium or Twitter to read more about Angular, Akita and JS!