Crunching RxAndroid — Part 7

In the last episode of Cruching RxAndroid, we focused on understanding and implementing RxBinding in order to capture events from the Views we have in our layouts and, previously, we had the pleasure of diving into RxLifecycle, meeting for the first time the compose() operator, that will be one of the main focuses of this part.

Are there custom operators in RxJava?

OK, we have already seen the potential of standard operators. What if we actually wanted to create something custom? Furthermore, we do have two distinct families of operators: the first one is the group of operators that will affect in some ways the sequence of items emitted by the source Observable. We call these Sequence Operators and we use them through the lift() function. The second family, instead, is the one that we already saw in RxLifecycle and it encapsulates all those objects which duty is to change the Observable itself.
We call this last family Transformational Operators and we use them through the compose() method.

Do we really need to use them?

Usually, you would not really need to create your own operators as those provided with the framework are more than enough and creating custom operators is actually pretty risky, as Dávid Karnok can explain us in his 3 articles (part 1, part 2 and part 3) but it would be nice to understand how things work and here we are!

Sequence Operators

If we take a look on how the Operator interface is defined, we can see that
it relies on a function that takes a Subscriber<T> and returns a Subscriber<R>: that means that we can change the type of the items emitted pretty easily but there is a catch: by creating our custom Operators, we risk breaking the chain of subscription and back pressure.

As aforementioned, these operators are those we use in order to affect the items emitted by the Observable: imagine that we want to create an operator that emits, per each item it receives, its square root.

The first thing we do is extend the Observable.Operator class and then return a new Subscriber in the call() method of our operator.

We might also want to check if the subscriber is still available, so we can do it before forwarding the events of our operator to the Subscriber:

Transformational Operators

These operators are less risky then the Sequence ones, as we are affecting the Observable as a whole and not the single items: for instance, RxLifecycle uses a custom operator in order to make the Subscription end within certain events of the Activity or Fragment that it’s bound to.

Now, we need to make most of our Observables run on a new thread and return the result on the main thread but we would need to make a call to observeOn() and subscribeOn() for each and everyone of them! Even worse, we could create a method that applies the schedulers to a give Observable. But this would be bad, as Dan Lew explains us clearly in his blog post.

This can be boring, tricky and risky: what if we forget to do it on a very long

Once more, we would need to use an interface contained into the Observable class, but this time we are talking about the Transformer one, which actually relies on a Function that takes as input an Observable<T> and returns an Observable<R>.

At this point, what we have to do is just to extend the Transformer interface,
apply the Schedulers we want and return the result:

Wrap up

First of all, let’s check how we would use the two operators we just created:

The lift() call is the one that executes our Sequence operator that calculates
the square root of a give integer, while the compose() method is the one providing the Transformer that applies the Schedulers.

This would be all but remember that, when creating custom Sequence operators, you need to worry about a wide variety of issues (like backpressure) you may encounter and you have to be prepared to face them, while the Transformers can come handy much more frequently, as they can change the source Observable only through existing operators and have far less risks correlated.

As usual, you can find the source related to this article into the GitHub repo.
Stay tuned and happy coding!