RxJava as event bus, the right way
Otto from Square got officially deprecated the previous days. In the Android world we can cheer now something like “EventBusses are dead long live RxJava”.

The raising question tough is how can you efficiently migrate from an existing event bus such as Otto or Greenbot’s EventBus to Rx, or sometimes known as RxBus.
Before we talk about RxJava as a event bus let’s be sure that one understands the functionality of an event bus.
What’s an event bus
The main idea behind it is that you can connect two objects/two classes that have different lifecycles or a very different hierarchy or items dependency in the simplest way possible. That’s all.
How many times have you wondered what’s the best way to connect a Activity, Fragment, Service or Dialog with each other?
Some case examples:
- You have a Activity and an Service. When something happens on the Service, then you e.g. change color of a button in the Activity — if it runs.
- You have a Fragment which tells your Activity to show a Dialog and on its result you need to change something on the Fragment. Take a pause here from reading. Think the callback relationships between those 3 files. Now try to think how you can scale this with 10 features related in those files. Heavy dependencies, lot’s of stuff.
Event bus’ solution to the above:
An event bus allows you to connect those two classes, directly. You can have an event starting from the class A notifying class B with the only mediator itself.
Example of triggering an event from class A:
// EventBus
eventBus.post(new AnswerAvailableEvent(42));
// Otto
bus.post(new AnswerAvailableEvent(42));
Example of “waiting” for an event in class B:
// EventBus
eventBus.register(this); // e.g. in onCreate
...
public void onEvent(AnswerAvailableEvent event) {
};
// Otto
@Subscribe
public void answerAvailable(AnswerAvailableEvent event) {
}
That’s it. You can place those lines in any of your files.
The Rx dawn
Listeners, Rx and event buses are using the reactive pattern but RxJava does it better. You want to know why? Some of my top personal arguments are:
- Ridiculously easier tracking of your code logic
- Much much easier testing
- Ridiculously easier threading management
- Both busses mentioned above have “one pipeline” and then you create the events going down e.g. AnswerAvailableEvent . In RxJava you do not need all these XxxxEvent, you can create multiple pipelines — I will show you.
- Higher manipulation/reusability/modularity of code and objects
Of course it hurts for the guys who contributed in those libraries to see that their code is becoming obsolete but that’s part of the nature of coding.
One pipeline vs multiple pipelines
Now after you understood how both RxJava and event buses work, you question yourself, I can make something similar to an existing event bus with one pipeline and multiple events. I will suggest you not to do so.
Let’s see how one pipeline looks like:
public class MyRxBus {
private static MyRxBus instance;
private PublishSubject<Object> subject = PublishSubject.create();
public static MyRxBus instanceOf() {
if (instance == null) {
instance = new MyRxBus();
}
return instance;
}
/**
* Pass any event down to event listeners.
*/
public void setString(Object object) {
subject.onNext(object);
}
/**
* Subscribe to this Observable. On event, do something
* e.g. replace a fragment
*/
public Observable<Object> getEvents() {
return subject;
}
}The above will need “XxxxxEvents” to work, which you later have to check for e.g.
getEvents().subscribe(new Action1<Object>() {
@Override
public void call(Object o) {
if(o instanceof AnswerAvailableEvent){
// do something
}
}
});This code suffers from testability, modularity and unnecessary xxxEvent classes which leads us to the next approach:
The concept of reactive models
The idea of modular reactive code came to me by Futurice’s repo. The concept is that you have multiple models doing the data or network logic for you while having clean Activities and Fragments.
For example let’s say you have a UserLocationModel which is responsible for providing you the physical location of the device. Let’s also assume that your activity “feeds” the UserLocationModel with events, which then returns the results in the Fragment.


And will look like this:
public class UserLocationModel {private PublishSubject<LatLng> subject = PublishSubject.create();
public void setLocation(LatLng latLng) {
subject.onNext(latLng);
}
public Observable<LatLng> getUserLocation() {
return subject;
}
}
What’s nice about it is the modularity it enables. If for some reason you decide that you need to extend these model you can do so:


And here you have it, no rocket science. The above architecture has been tested pretty well in the projects I work at and I can’t think of a better one to use with RxJava.
Small tips
- Make small function-wise model classes that can be (and are) tested
- Put as much logic in your models and not in the Activities/Fragment/Dialogs
- Keep the threading logic in the Activities/Fragments/Dialogs because unit testing is a bit harder (but doable)
- Use Dagger to inject the models at ease, or create connections with each other.
Do you have any more ideas? Write a comment!