I created a Flowable and I liked it.

Łukasz Szczygłowski
AndroidPub
Published in
6 min readApr 30, 2018
My small engineer studying physics of a wheel

Nothing more than being a father has taught me than children’s routine is one of the most important aspects of their young life. It is what makes them feel secure and helps us, parents, to plan our personal daily activities.

The same rule applies to software engineering. By following SOLID and other OOP design principles, engineers can make source code clean and understandable. Introducing custom in-project patterns on how particular areas have to be extended makes every developer on a team feel secure. Extend one thing, you will not break other system areas. Follow the routine and whole the product will speak the same language.

Routine, is the keyword for today.

Routine is what I feel being missing when it comes to creating custom RxJava Flowables. That being said, the goal for this article is to fill the gap and specify 3 generic patterns on how imperative sources should be properly converted into backpressured RxJava streams.

Controlled sources

Think of a controlled source as of an object you iterate through. It’s totally up to you on when the onNext event occurs while converting that into a reactive stream. This could be Android’s Cursor, file-reading InputStream or any sort of Iterable (although Flowable.fromIterable can take care of those).

When converting a controlled source into a reactive stream, use Flowable.generate.

Example (a stream of Contacts Provider contacts)

Let’s introduce a data class representing an individual Contact. We will be passing it down the stream from a source. Note that anything you want to emit through any RxJava stream must be immutable. By doing so you guarantee that no two separate Subscribers can ever interfere with each other. We will call the class SimpleContact.

Now let’s specify how we will be utilizing provided SimpleContact instances — let’s print them to Android’s LogCat window.

The next step is to expose the magic hidden under the contacts method. Under the hood we will be using Flowable.generate as mentioned — the right approach for controlled sources.

Representing the method as Flowable.generate(1, 2, 3):
1 — a function that creates our state
2 — a function that takes a current state, must call at most once either of onNext, onComplete or onError and return the updated state. The generator is called each time when a downstream Subscriber is ready to consume the next event. This means the function is being called multiple times. In every invocation, the state will represent what has been returned by the previous invocation (or the function 1 when called for the first time).
3 — (optional) a function responsible for disposing the state. This is where we should close a Cursor or an InputStream. It is called once a downstream Subscriber unsubscribes or onComplete is called on the emitter within the generator (function 2 ).

In order to obtain Contacts Cursor we will need to query it on a ContentResolver.

Using the Flowable.generate the contacts method would look as below.

Note how we hid the mutable Cursor within the generate method and are emitting only immutable SimpleContact down the stream. A new Cursor row is only obtained when needed, saving unnecessary resource drainage caused by CursorWindow allocations. More on windowing in a great article by Chris Craik, here.

Uncontrolled sources

Uncontrolled sources refer mostly to objects like callbacks or any sort of listeners, where we cannot predict how often those will be invoked.

When converting an uncontrolled source into a reactive stream, use Flowable.create.

Simplifying the method into Flowable.create(1, 2):
1— a function that receives an Emitter (basically a downstream Subscriber instance wrapped into internal Emitters). Within the function we can call onNext as many times as needed and terminate optionally either by onError or onComplete. Note that when terminated, no additional invocation of onNext is allowed.
2— Since we cannot control the speed in which the items are emitted, we are required to provide a BackpressureStrategy governing on how the emission is throttled. Based on the selection, a downstream Subscriber will be wrapped into a special Emitter class under the hood, providing the requested backpressure. Jag Saund wrote a good article on the BackpressureStrategy, although applicable to RxJava 1.0, here (in case you would want to dive deeper).

Example (a stream of the latest SharedPreferences value).

Note that the BackpressureStrategy.LATEST has been applied as we are interested only in processing the most recent value. It was also necessary to call setCancellable on the Emitter, providing a way to cleanup once the stream completes or a Subscriber unsubscribes.

We can then use it in our code as follow.

Uncontrolled multithreaded sources

The previous example with OnSharedPreferenceChangeListener benefits from one significant simplification — the callback is always called within the same, Android’s UI thread. This is not always true for all the cases, especially when it comes to cross-process (AIDL-backed) callbacks invocations.

Android’s ConnectivityManager.requestNetwork method is a good example of the issue. It can be used to request a network matching specific, provided criteria (i.e. WIFI or Cellular). The method requires a caller to specify a NetworkCallback instance. Once a network matching the criteria becomes active, onAvailable is called. Respectively if the network goes down, the callback will be notified with onLost.

For purpose of the example we will create a stream that requests a Cellular network under the hood, and emits whether it has been connected or not down the stream. First we need to define the data to be emitted.

We clearly have no control over when onAvailable and onLost are called. This makes the source uncontrolled — we should be usingFlowable.create as previously discussed. Note that before obtaining anything from the ConnectivityManager, the stream should start with Network.NONE to indicate that the state is yet to be determined. A naive approach to the problem would look as below.

The code then can be tested by printing out both the network and a thread that performed the emission.

Following output can be seen on the LogCat window as the code is executed.

The stream is subscribed on the main thread (no subscribeOn specified) and the NetworkCallback methods are called on internal ConnectivityManager’s thread. As not problematic as it would seem, this might be a real issue in a case when a callback that is expected to be called on a thread A out of a sudden emits on a thread B. This can happen if, in example, we are registering callbacks on a remote service that initially is running in the same process as the callback. One can then establish initial expectation upon the code that all the callbacks are called from the Main thread. By simply moving the Service to an external process things will change. A BinderThread, instantiated for each remote connection, will take over instead.

The way to fix that is to use Schedulers. By calling createWorker one can obtain a Worker instance which can be then used to delegate all the Emitter’s onNext, onComplete and onError invocations.

The implementation is very similar to what RxJava sources like Flowable.interval and Flowable.delay do. The emission is performed by default on a particular Scheduler (could be whatever we need), additionally allowing a caller to change it as needed.

A single Worker instance is guaranteed to provide sequential execution of jobs internally, thus running on a single thread (one will be picked up in case of pools like io or computation). Note that Workers are Disposable —registering them as Emitter’s Disposable guarantees that no jobs will be executed once the unsubscribe event takes place. This pattern can be seen in the line 30 of the code snippet above.

One important thing to note here is that applying subscribeOn instead will not solve the problem — it won’t affect the thread that’s notifying the callback. Additionally, observeOn could be applied down the stream to streamline the emission. This would be however only a workaround to a serious issue occurring upstream.

Summing up, when converting an uncontrolled multithreaded (or potentially multithreaded) source into a reactive stream, use Flowable.create , scheduling emission on a Worker class.

Sample application with all the code presented in this article can be found on my GitHub. Click below.

That’s it for today. In case you wanted to dive deeper remember to check out my other article on Advanced Reactive Programming.

Let the life flow smoothly like a reactive stream and remember to apply the backpressure where needed.

Thank you!

--

--