RxJava —RxReplayingShare, Emit only Once
Following up on my previous article about sharing an Observable with multiple Subscribers, but receiving the emitted data only once, a Reddit user point me to on of Jake Wharton’s small Library.
The Library is called ReplayingShare and as would someone expect, what it does is replay the data emitted from the Observable to our Subscribers.
But let’s have a deeper look at the code, in order to see what exactly its behaviour is.
So what we can see here, is that we have a class which implements the Transformer interface, in order to produce a new Observable out of the one that we are giving it.
This class also contains, another private static final Class, named LastSeen. Now LastSeen, we can see that it contains a variable named last which is a Generic variable, a consumer variable which is of type Action1<T> and a producer variable which is a simple Observable<T>.
Ok all of this seems to be great and a little tricky or mysterious if you are just starting with RxJava. So where does all the magic happens you would ask me, right?
All the magic happens on line 48. Let’s analyze and see what exactly is taking place on that line.
First of all, by implementing the Transformer interface we get to implement the Observable<T> call(Observable<T> observable) method, which is were our transformation should occur.
On line 48 now, Jake takes the Observable that was passed through .compose() to our Transformer and applies the .doOnNext() operator which needs an argument of type Action1, so Jake feeds it with the consumer variable of the LastSeen Class.
What is the responsibility of the consumer variable though?
As we can see on line 62, the only thing that consumer is responsible for, is to keep the value emitted on the Action1, on our last variable inside the LastSeen Class.
After that, we can proceed on the next operator on line 48. The next operator is the .share() operator, which according to RxJava’s docs, all it does is:
Returns a new Observable that multicasts (shares) the original Observable. As long as there is at least one Subscriber this Observable will be subscribed and emitting data. When all subscribers have unsubscribed it will unsubscribe from the source Observable.
So basically now our Observable is getting shared. Following the .share() operator we have the .startWith() operator, which as an argument it receives an Observable and so Jake passes it the producer variable from the LastSeen Class.
Now our Observable, starts emitting data on the following way: It will assign to the last variable of the LastSeen Class the value that our Observable received and then will start by emitting the previous value of the last variable, as a result of the .startWith() operator, and then will emit the value that was received again after executing the original Observable’s computation again.
But wait, if you use a Debugger and try to analyze the flow of this when you have 3 Subscribers for example, what you will see, is that in the onNext() of the 1st Subscriber you will receive your initial value, then proceed on the following Subscriber’s onNext(), where you will receive the value of the initial execution of the Observable, same as the value on the previous onNext() method, but this time the onNext() of the 2nd Subscriber will be re-called and you will receive a new value, as a result of the re-execution of your Observable. The same applies for the 3rd Subscriber as well.
Obviously this kind of flow is not what we originally wanted and not what my previous article was trying to achieve. But come on, that’s why we are called Engineers. We need to find a way to solve it using the Transformer.
After carefully examining the flow, my first thought was that we need to only keep the first value emitted by the first execution of our Observable, just because we do not wish to force re-executions of the Observable and we need to share the exact same data from the exact same Memory Address to all of our Subscribers.
So we can start by changing the behaviour of the consumer variable. What we need to do is only keep the first value, which is equivalent to only assigning a value on the last variable when the last variable is equal to NONE.
Ok great, the first problem is solved. But now how do we proceed on with the .startWith() operator?
To solve this, we can apply the .flatMap() operator to our Observable, which every time it is called will return the producer variable from the LastSeen Class, which as described above is an Observable. This Observable’s sole responsibility is to emit the value saved on the last variable, and since the value saved there is the initially retrieved value, we are partially solving our second problem. Also .flatMap() operator does not cause re-execution of our initial Observable. It just transforms our Observable to the producer variable of the LastSeen Class which will act as mentioned before. By doing this, we have fully solved the second problem as well :)
All of the above changes, will result to the same behaviour that my previous article was describing, in a cleaner way, by just calling the .compose() operator and passing it an instance of the ReplayingShare class, as following:
ourObservable.compose(ReplayingShare.instance());
Most of the time the above statement will be something like this in a real life scenario:
Observable<List<Contributor>> testObservable = retrofit
.create(GitHub.class)
.contributors("square", "retrofit")
.compose(ReplayingShare.<List<Contributor>>instance());
Happy Rx-ifying :)