RxAndroid: An event bus on steroids
In this article I will try to describe how to build a Event bus using RxJava through an example: A very simple app that displays random Chuck Norris jokes (or are they facts?). When the user requests a new joke (
trigger), we will fetch a joke from the server (
action), save the joke to the DB (
side effect) and notify interested observers , if any (
notification). A more generic version of the above would be:
trigger requests an
action to be performed,
side effects take place, and interested parties are
notified, if any.
If any? Yes, if the
trigger is pulled, the
action will be performed even if no one is listening.
We are going to be using
retrofit for this. With the above in mind, let’s start off by creating a singleton (preferably managed by dagger) with the following methods:
execute() method itself will be our
.doOnNext(joke -> ...) our
side effect. We are almost there. The tricky part turns out to be
notifying our observers.
We need to multicast our results to anyone listening and there are two ways of multicasting in
I explain why using
Connectable Observables didn’t work out for me at the end of this post. But in the spirit of keeping this article short, I’ll show you what did work first:
Let’s give our singleton class a name:
RxStore and make it a bit more generic, instead of working with
jokes, we’ll work with
Input/Output of any kind:
execute() now takes in a value of type
Input as a parameter to customise our request (think
event.pageNumber() if you are implementing pagination).
That’s it, our singleton is complete. Take a few seconds to familiarise with it. You’ll notice that the subscribers now register directly to a
relay as opposed to the observable. What is a relay you ask? It’s just a subject with the methods
onError() striped out, I’ll explain why this is useful in a sec, bear with me here. This is a summary of what
Observablesget created on ‘
execute()’, perform their
side effects, and lastly report their results to the
relaybefore terminating (dying forever). The
relayin turns, reports to its subscribers.
…The difference is, the
relay doesn’t terminate ever!, if it did our subscribers would never receive another notification. Let’s implement this class to see how to use its abstract methods:
— Ok, I get it, you stripped out the methods
onError to prevent the
relay from dying… but what about errors, how do we handle those now?
Great question! Without a
onError method we we’ll need to handle our errors on
onNext(), just wrap your
joke object into a
JokeResponse with methods like
getErrorString(). It’s very important that you catch any potential errors on the observable returned by
buildObservable(), a good way of doing this is using the method
relays part of
They are not, it’s a library by Jake Wharton. You can find it here
— Why are you using a BehaviourRelay? Can I use another one?
I use Behaviour relay so I receive the last retrieved joke upon subscription. You can use whichever fits your needs.
So what, what makes this approach so special?
What’s so good about this approach is that all the logic is encapsulated inside that singleton. Which makes it really easy to test. The presenter doesn’t have to know how to fetch the data, or where to fetch it from (db, memory, network).
Another advantage is that it’s independent from the android lifecycle callbacks. If the user request data and rotates the phone, the network request won’t get interrupted.
Lastly, it’s a great way of propagating changes throughout your app, in our example, all views registered to
JokesStores will get notified when someone requests a new joke, it’s like an event bus, but more event-specific.
Shut up and show me the code
The interesting classes are:
Oh I almost forgot, I promised Chuck Norris jokes:
Dead once had a near Chuck Norris experience
Chuck Norris doesn’t dial the wrong number. You answered the wrong phone
Chuck Norris can kill two stones with one bird.
I’d like to close by saying that I’m not an expert on the subject (not even close), as the title suggests I’m approaching doubts us beginners have when starting to play around in the reactive world in the hopes of making this approach more appealing for newcomers. Any constructive criticism is more than welcome and appreciated. Thanks for reading!
ConnectableObservables don’t work in this scenario:
Connectable Observables didn’t work for me. Unfortunately, adding
.publish() to our observable makes the problem with our architecture more obvious.
Every time we call execute we are creating a new
observable, then when we call
subscribe(), which one are we really subscribing to? We don’t want to have more than one observable at any given time because it would very hard for subscribers to keep up to the newest one.
That’s ok, we can still fix it. we create the observable once and we call .
This is going to work just once. Successive
connect() calls will just return the same value. The network request won’t be executed again.