RxJava — One Observable, Multiple Subscribers, Same Data
So we had to tackle a problem on the office the other day. The problem was fairly simple: Be able to share a Retrofit response as an Observable to multiple subscribers without the side effect of re-executing the Network Call that this Observable was bound to.
If you want to see that a Network Call is re-executed when you are adding a new Subscriber to an already existing Observable, you can use the Android Studio’s Debugger and check the Memory Address that your data is getting saved on, or just print out the received, let’s say ArrayList and check the Memory Address from the Log.
Supposedly the .share() operator which basically is a shorthand for .publish().refCount() should provide that kind of behaviour, but this was only working for a simple Cold State Observable like Observable.just(1,2,3,4,5) where both the Subscribers that I have added to that Observable were getting their data on the same Memory Address.
Unfortunately though, when we had to deal with Observables that were executing Retrofit API calls, each subscriber was causing the original Observable to re-execute the API call, which was not our desired result, because on the case that you wanted to share these data between a View and a Service that would act upon these data, there could always be a chance that you would receive slightly different results, but certainly not the same results, saved on the same Memory Address.
One way of solving this, was changing from Observables to ReplaySubject, but that would automatically mean, that we should change our entire Service class to return ReplaySubject, in the form of Observable, but still a huge change.
Another possible way of solving this, would be to use the .cache() operator, in order to deal with the exact same data on the exact same Memory Address, but as always this would also require us to implement a Cache Invalidation logic, in order to be able to refresh our data when needed.
The .share() operator, as we said before, includes the .publish() operator, which automatically means that it transforms your Observable to a ConnectableObservable (an Observable that would only start emitting data once its .connect() method is called). So that would mean, that we could add our initial Subscribers and then call .connect(), in order for the initial Observable to start emitting data.
What would we do though in the case that we wanted to add more Subscribers to it, but we did not know when?
Apparently, this is solved, because there is no need to call .connect() again, which would automatically meant that you would be carrying an Observable around in order to call .connect() on it after you add a new Subscriber, so that you can get your data.
What about the need to execute the Retrofit API call only once though? This is easily solvable with the help of .replay() operator, which makes sure that all our Subscribers, no matter when they have subscribed to our Observable will receive the same data that were produced from the initial execution of our Retrofit API call.
So there we had it. We now have the ability to add multiple Subscribers to our Observable, whenever and wherever we need them, without the need to call .connect() each time. Also, the .replay() operator ensures us that all of our Subscribers will receive the same data :)
Happy Rx-ifying :)