RxAndroid: An event bus on steroids

In this article I will try to describe how to build a Event bus using RxJava through an example: A very simple app that displays random Chuck Norris jokes (or are they facts?). When the user requests a new joke (trigger), we will fetch a joke from the server (action), save the joke to the DB (side effect) and notify interested observers , if any (notification). A more generic version of the above would be:

A trigger requests an action to be performed, side effects take place, and interested parties are notified, if any.

If any? Yes, if the trigger is pulled, the action will be performed even if no one is listening.

We are going to be using rxjava and retrofit for this. With the above in mind, let’s start off by creating a singleton (preferably managed by dagger) with the following methods:

The execute() method itself will be our trigger. endpoints.getJoke() our action, and .doOnNext(joke -> ...) our side effect. We are almost there. The tricky part turns out to be notifying our observers.

We need to multicast our results to anyone listening and there are two ways of multicasting in rxjava:

  • Connectable Observables
  • Subjects / Relays

I explain why using Connectable Observables didn’t work out for me at the end of this post. But in the spirit of keeping this article short, I’ll show you what did work first: Subjects/Relays.

Let’s give our singleton class a name: RxStore and make it a bit more generic, instead of working with jokes, we’ll work with Input/Output of any kind:

The method execute() now takes in a value of type Input as a parameter to customise our request (think event.pageNumber() if you are implementing pagination).

That’s it, our singleton is complete. Take a few seconds to familiarise with it. You’ll notice that the subscribers now register directly to a relay as opposed to the observable. What is a relay you ask? It’s just a subject with the methods onComplete() and onError() striped out, I’ll explain why this is useful in a sec, bear with me here. This is a summary of what RxStore does:

Observables get created on ‘execute()’, perform their action and side effects, and lastly report their results to the relay before terminating (dying forever). The relay in turns, reports to its subscribers.

…The difference is, the relay doesn’t terminate ever!, if it did our subscribers would never receive another notification. Let’s implement this class to see how to use its abstract methods:

— Ok, I get it, you stripped out the methods onComplete and onError to prevent the relay from dying… but what about errors, how do we handle those now?

Great question! Without a onError method we we’ll need to handle our errors on onNext(), just wrap your joke object into a JokeResponse with methods like isSuccessful() and getErrorString(). It’s very important that you catch any potential errors on the observable returned by buildObservable(), a good way of doing this is using the method onErrorReturn().

— Are relays part of RxJava?

They are not, it’s a library by Jake Wharton. You can find it here

— Why are you using a BehaviourRelay? Can I use another one?

I use Behaviour relay so I receive the last retrieved joke upon subscription. You can use whichever fits your needs.

So what, what makes this approach so special?

What’s so good about this approach is that all the logic is encapsulated inside that singleton. Which makes it really easy to test. The presenter doesn’t have to know how to fetch the data, or where to fetch it from (db, memory, network).

Another advantage is that it’s independent from the android lifecycle callbacks. If the user request data and rotates the phone, the network request won’t get interrupted.

Lastly, it’s a great way of propagating changes throughout your app, in our example, all views registered to JokesStores will get notified when someone requests a new joke, it’s like an event bus, but more event-specific.

Shut up and show me the code

The interesting classes are: RxStore, JokesStore and JokesPresenter

Oh I almost forgot, I promised Chuck Norris jokes:

Dead once had a near Chuck Norris experience
Chuck Norris doesn’t dial the wrong number. You answered the wrong phone
Chuck Norris can kill two stones with one bird.

I’d like to close by saying that I’m not an expert on the subject (not even close), as the title suggests I’m approaching doubts us beginners have when starting to play around in the reactive world in the hopes of making this approach more appealing for newcomers. Any constructive criticism is more than welcome and appreciated. Thanks for reading!

Further reading:

Why ConnectableObservables don’t work in this scenario:

Using Connectable Observables didn’t work for me. Unfortunately, adding .publish() to our observable makes the problem with our architecture more obvious.

Every time we call execute we are creating a new observable, then when we call subscribe(), which one are we really subscribing to? We don’t want to have more than one observable at any given time because it would very hard for subscribers to keep up to the newest one.

That’s ok, we can still fix it. we create the observable once and we call .connect() inside execute() right?:

This is going to work just once. Successive connect() calls will just return the same value. The network request won’t be executed again.