Converting callback async calls to RxJava
Using RxJava’s Observable.fromAsync() to convert asynchronous APIs while properly dealing with backpressure
Since we started using RxJava in our Yammer Android app we’ve often encountered APIs that don’t follow its reactive model, requiring us to do some conversion work to integrate them with the rest of our RxJava Observable chains.
APIs usually offer one of these two options when dealing with expensive operations:
- A synchronous blocking method call (expected to be called from a background thread)
- An asynchronous non-blocking method call that uses callbacks (and/or listeners, broadcast receivers, etc)
Converting synchronous APIs to Observables
For the first type of APIs, RxJava offers a convenient factory method named Observable.fromCallable(). For example, this is how it would look to convert the commit method from Android’s SharedPreferences API:
This abstracts away all the complexity behind implementing a reactive method, while automagically adhering to The Observable Contract™ thus making it very easy to convert these types of APIs to RxJava.
Converting asynchronous APIs to Observables
Converting the second type of APIs is not as straightforward. The pattern that we see again, and again, (and again) is to wrap them using the factory method Observable.create(). Unfortunately this approach has several disadvantages which will be explained in this post.
Let’s say we need to track a device’s accelerometer with the help of Android’s SensorManager which uses both callbacks and listeners. Let’s see how the regular usage of the Sensor Manager would look like in an Activity:
A naïve implementation of the logic needed to wrap this API in an RxJava Observable would look something like this:
Using this conversion we can then consume the accelerometer values in a reactively manner:
Although this would work, it would require a lot more effort before it can completely fulfill the requirements from the Observable contract:
- We would need to unregister the listener after an unsubscription happens to avoid memory leaks.
- We would need to check for errors and properly report them up the Observable chain using onError() to avoid potential crashes.
- We would need to check if the subscriber is still subscribed before emitting values via onNext() or onError() to avoid unwanted emissions.
- We would need to handle backpressure to avoid crashing with MissingBackpressureException.
Implementing 1-3 correctly it’s still possible –albeit laborious– using Observable.create():
Implementing (4) is not trivial to say the least, since properly dealing with backpressure is a subject that entire books could be written on, and shouldn’t be expected to be implemented manually every time you need to wrap an async API.
Luckily for us, the smart people from RxJava have recently released better support for all this in v1.1.7, in the form of a new factory method called Observable.fromAsync().
UPDATE: Jake Wharton brought to my attention that the name of this method is going to change from Observable.fromAsync() to Observable.fromEmitter() in v.1.2.0 of RxJava.
Observable.fromAsync() to the rescue
Documentation is still minimal, but perhaps an example would better show how this method can help us implement 1-4 above:
The first thing you should notice is that we’re not explicitly implementing (2) or (3); this is because the Observable.fromAsync() will now be handling these cases for us.
Implementation for (1) remains very similar, albeit with a friendlier API named setCancellation().
For (4) all we have to do is specify the backpressure strategy to be used.
The deal with backpressure
Backpressure in simple terms, is when a producer emits values on a rate that is way faster than what the consumer can handle. This rate is defined by an internal bounded buffer size, that when overflown would lead to the mysterious MissingBackpressureException being thrown.
Again, implementing backpressure handling isn’t easy but thanks to the fromAsync() method, its complexity has now been reduced to just understanding how these already implemented strategies behave. These are the different BackpressureModes at our disposal:
- BUFFER: switches to an internal unbounded buffer (instead of observeOn’s 16-element default buffer). This will allocate a 128 initial element buffer that will continue to dynamically grow as needed until the JVM runs out of memory.
This strategy is usually what you should choose when dealing with hot Observables (i.e. one that never calls onCompleted()), as shown in this example.
- LATEST: reports only the latest values effectively overwriting older, undelivered values. Internally this is creating a buffer of size 1.
This is usually a good candidate for a cold Observable (i.e. one that always completes), especially if we know it only emits one value.
- DROP: similar to LATEST, except it reports the first value that arrives, and ignores anything that arrives later.
Also a good candidate for cold, single-emission Observables.
- ERROR / NONE — will keep behaving as before, i.e. will throw a MissingBackpressureException when the buffer overflows.
It’s important to understand the differences between these strategies and choose the one that is right for your needs. Since you’ll be usually wrapping 3rd party APIs, you have to be extra careful to ensure that a producer that emits more values that you can handle in a timely fashion won’t crash your app.
Show me the code
You can find an example Android app that I created that will allow you to simulate a backpressure error scenario using the SensorManager example from above. You can use it to explore the strategies implemented by the different BackpressureModes.
When converting existing APIs to RxJava reactive Observables:
1) Try to find a synchronous version of the method and then wrap it using Observable.fromCallable().
2) If only an async method is provided:
* Avoid using Observable.create() factory method at all costs.
* Instead use the new Observable.fromAsync() factory method making sure to:
- Call onNext(), onCompleted(), and onError() as appropriate.
- Provide cleanup logic (if needed) via the setCancellation() method.
- Understand and choose the best BackPressureMode for your use case.
All subjects discussed in this blog post pertain solely to RxJava v1.x. As of the date of this publication, the version 2.0.0 of RxJava is already in Release Candidate mode, and it includes many changes related to backpressure; mainly the introduction of the Flowable concept, along with its own factory methods. Please read about these differences if you’re planning to use RxJava v2.x on your projects.