RxJava recipe: Convert any Android event into a stream

The problem

First I will state the problem we will solve.

Imagine you want to listen to an event (e.g. Android OS Notifications or Bluetooth state) in multiple parts of the application but only when any of these parts is interested in the information.

  • The event listener must be registered only when the first client in the app is interested in it and it must un-register as soon as nobody is interested anymore.
  • When a new client subscribes to the event it should receive immediately the last event already sent to existing clients so clients are always in sync.

In this article we will explore how to solve this problem by mixing different RxJava components. You should be familiar with concepts like Subjects, Replay and Connectable Observables before diving in, because this article will help you understand the nuances of each component.

Spoiler alert: there is more than one operator required to reach the solution

Note: This article discusses RxJava 1, if you use RxJava 2 these solutions still work, although a simpler method is mentioned in the last paragraph.

The metaphor

Now I will illustrate the problem with a metaphor.

Image copyright holder vectomart — Fotolia

Imagine the living room of your house has a nicely automated sound system that works like this:

  • When the first person enters the room, music starts playing non-stop,
  • as long as there are people in the room the music keeps playing and everybody hears the same songs in the same order,
  • when a single person leaves the room the music keeps playing for those who stay in the room and the person that left simply doesn’t hear the songs anymore,
  • finally, when the last person leaves the room, the music stops playing and the sound system turns off automatically.

Preparing to cook the solution

The solution will be introduced in small steps -in the form of a cooking recipe- explaining the reasons to add each ingredient and what problem each one tries to solve.

TL;DR if you just want the solution scroll directly to the end to see the sequence of operators

Warming up the oven

The first RxJava ingredient that covers most of the needs of our goal is a BehaviorRelay from RxRelay (a.k.a. safe BehaviorSubject). This will provide subscription/unsubscription events, multicasting to multiple observers like a hot observable and finally a replay of the last emitted value to new observers.

The steps to prepare this ingredient are:

  • In doOnSubscribe() register the listener, event bus (cough!) or any other producer of events
  • In doOnUnsubscribe() un-register the listener, event bus or any other producer of events
  • When a new event is received from the producer, then call the relay to emit the event to all observers
A Relay is basically a Subject except without the ability to call onComplete or onError
Image copyright reactivex.io

This works fine, except for the fact that every new subscription to the observable causes an extra registration to the listener and every unsubscription from the observable causes an extra un-registration from the listener, even if there are more observers still subscribed to the observable.

This recipe can not be used with multiple concurrent observers.

The code above produces this output:

BehaviorRelayProblem -> observer-1 subscribes
BehaviorRelayProblem -> doOnSubscribe
BehaviorRelayProblem -> observer-1 -> onNext with 1
BehaviorRelayProblem -> observer-2 subscribes
BehaviorRelayProblem -> doOnSubscribe <= PROBLEM: unwanted call
BehaviorRelayProblem -> observer-2 -> onNext with 1
BehaviorRelayProblem -> observer-1 -> onNext with 2
BehaviorRelayProblem -> observer-2 -> onNext with 2
BehaviorRelayProblem -> observer-1 unsubscribes
BehaviorRelayProblem -> doOnUnsubscribe <= PROBLEM: unwanted call
BehaviorRelayProblem -> observer-2 unsubscribes
BehaviorRelayProblem -> doOnUnsubscribe

Every new observer that subscribes will trigger an extra registration to the listener :-(

Spicing things up

A simple fix to this problem is to combine the BehaviorRelay with the share() operator. This will cause the doOnSubscribe() method in the source observable to be triggered only once when the first observer subscribes to the stream and similarly the doOnUnsubscribe() will be called only once when the last observer unsubscribes from the stream.

The share() operator is an alias for publish().refCount()
Image copyright Netflix Original Production

The code above produces this output:

share -> observer-1 subscribes
share -> doOnSubscribe
share -> observer-1 -> onNext with 1
share -> observer-2 subscribes
<= PROBLEM: missing call “observer-2 -> onNext with 1
share -> observer-1 -> onNext with 2
share -> observer-2 -> onNext with 2
share -> observer-1 unsubscribes
share -> observer-2 unsubscribes
share -> doOnUnsubscribe

Last emitted value is not replayed to new observers :-/

The perfect dish

We are close but not quite there yet because now when a second observer subscribes it doesn’t get any value until a new event is emitted. Consider an event was already emitted to previous observers when a new observer subscribes, wouldn’t it be nice if we can emit the last known value to new observers immediately?

The answer to this problem is to replace .share() with RxReplayingShare and since now the replay cache is part of the ReplayingShare object we should use a PublishRelay instead of the BehaviorRelay.

ReplayingShare is a RxJava transformer which combines replay(1), publish(), and refCount() operators
Image copyright 2016 Jake Wharton

The code above produces this output:

ReplayingShare -> observer-1 subscribes
ReplayingShare -> doOnSubscribe
ReplayingShare -> observer-1 -> onNext with 1
ReplayingShare -> observer-2 subscribes
ReplayingShare -> observer-2 -> onNext with 1
ReplayingShare -> observer-1 -> onNext with 2
ReplayingShare -> observer-2 -> onNext with 2
ReplayingShare -> observer-1 unsubscribes
ReplayingShare -> observer-2 unsubscribes
ReplayingShare -> doOnUnsubscribe

The final solution is to compose a BehaviorSubject with replay(1), publish() and refCount(). A better alternative to achieve this: combine a PublishRelay with ReplayingShare

Closing comments

One example to apply this technique is in Android services that need to run only on demand when some part of the app is interested in the events produced by the service, these services keep producing elements until nobody is interested in them anymore and then they shut down.

I personally love how you can compose different components of RxJava to create your own recipe to solve a very specific problem. Surely there must be other ways to mix RxJava ingredients and obtain similar results, so I would love to hear in the comments your alternative recipe to cook the same results.

Gourmet tip: Recently introduced factory Observable.create(emitter) has been designed to cover the register/unregister listener use case; hence, the problem stated here could also be solved using create() to obtain similar results. The right combination is create(emitter) with ReplayingShare.

Thanks to Jolanda Verhoef and Olaf Achthoven for peer reviewing this article