Crunching RxAndroid — Part 10

In the latest episode of this series, we talked about how an Observable can be retained through the lifecycle of the Activity, but we still lack a very important piece that will make Reactive Programming even more valuable in our every day life.

A step back

We shall all be pretty familiar with the way Android is heavily based upon a listener pattern derived from the Observer one, and we are pretty sure that most of the times there’s a *Listener that will be notified whenever something changes. But we don’t really have to deal with that and we can actually use the power of RxJava to convert every listener into something emitting information that we can subscribe to.

First of all, let’s see how we could wrap a very long operation (such as a File download) into a more Reactive approach. Let’s suppose we have something like this:

Now, we can make use of the Observable.fromCallable() factory method, so that we create (out of our method) a new Observable that will not only give us a Reactive approach, but also redirect our Exception to the onError method of our chain:

Of course, with the magic of lambdas this would be downsized to just a single line, but we get the idea.

Reacting to callbacks

Luckily, RxJava doesn’t come with just a wrapper for synchronous functions, but it also allows us to wrap asynchronous callbacks by making use of the factory method Observable.fromAsync(). Let’s see how it looks like:

As we can see, this method takes two parameters, the first one being an Action1, while the second one defines the kind of Backpressure wanted for the Observable, picking from a wide variety of types defined by the BackpressureMode enum (we will cover it in a following episode).

Studying more carefully Action1<>, we see that its generic type resolves to AsyncEmitter<T>, where the T is the type of data we will be emitting. Inside this Action1, we can create our listener, so that we can react properly to the events.

Let’s see an example made upon the Firebase Database api, which returns us a DataSnapshot when a ValueEventListener is registered:

Apart from the implementation of the ValueEventListener interface, we can see that we inject an AsyncEmitter<DataSnapshot> in the constructor, then we emit the data as soon as it changes by using the onNext() method in the emitter (we complete right away as we just want one value) and we instead propagate the error in case of cancelation.

At this point, we just need to wire things up, and this is done pretty easily:

What’s important here is that when we use the AsyncEmitter<> we get in the Action1 to create a listener that we attach to the database, so that we can then deal with the stream of data returned from the database.

Note: starting from RxJava 1.1.10, the Observable.fromAsync() factory method has been renamed Observable.fromEmitter(), as Plastix mentioned in the comments.


Of course, we could have done it with the Observable.create() factory method that would work in a very similar way, but the very reason why these two methods have been created is that create() doesn’t give us any weapon for dealing with Backpressure and this is why we shall always pick those upon the latter (if you want to know more about it, check out the amazing speech made from my friend Sasa Sekulic, together with the related slides).