I created a Flowable and I liked it.
Nothing more than being a father has taught me than children’s routine is one of the most important aspects of their young life. It is what makes them feel secure and helps us, parents, to plan our personal daily activities.
The same rule applies to software engineering. By following SOLID and other OOP design principles, engineers can make source code clean and understandable. Introducing custom in-project patterns on how particular areas have to be extended makes every developer on a team feel secure. Extend one thing, you will not break other system areas. Follow the routine and whole the product will speak the same language.
Routine, is the keyword for today.
Routine is what I feel being missing when it comes to creating custom RxJava Flowables
. That being said, the goal for this article is to fill the gap and specify 3 generic patterns on how imperative sources should be properly converted into backpressured RxJava streams.
Controlled sources
Think of a controlled source as of an object you iterate through. It’s totally up to you on when the onNext
event occurs while converting that into a reactive stream. This could be Android’s Cursor
, file-reading InputStream
or any sort of Iterable
(although Flowable.fromIterable
can take care of those).
When converting a controlled source into a reactive stream, use Flowable.generate
.
Example (a stream of Contacts Provider contacts)
Let’s introduce a data class representing an individual Contact. We will be passing it down the stream from a source. Note that anything you want to emit through any RxJava stream must be immutable. By doing so you guarantee that no two separate Subscribers
can ever interfere with each other. We will call the class SimpleContact
.
Now let’s specify how we will be utilizing provided SimpleContact
instances — let’s print them to Android’s LogCat
window.
The next step is to expose the magic hidden under the contacts
method. Under the hood we will be using Flowable.generate
as mentioned — the right approach for controlled sources.
Representing the method as Flowable.generate(1
, 2
, 3
):1
— a function that creates our state2
— a function that takes a current state, must call at most once either of onNext
, onComplete
or onError
and return the updated state. The generator
is called each time when a downstream Subscriber
is ready to consume the next event. This means the function is being called multiple times. In every invocation, the state will represent what has been returned by the previous invocation (or the function 1
when called for the first time).3
— (optional) a function responsible for disposing the state. This is where we should close a Cursor
or an InputStream
. It is called once a downstream Subscriber
unsubscribes or onComplete
is called on the emitter within the generator (function 2
).
In order to obtain Contacts Cursor
we will need to query it on a ContentResolver
.
Using the Flowable.generate
the contacts
method would look as below.
Note how we hid the mutable Cursor
within the generate
method and are emitting only immutable SimpleContact
down the stream. A new Cursor
row is only obtained when needed, saving unnecessary resource drainage caused by CursorWindow
allocations. More on windowing in a great article by Chris Craik, here.
Uncontrolled sources
Uncontrolled sources refer mostly to objects like callbacks or any sort of listeners, where we cannot predict how often those will be invoked.
When converting an uncontrolled source into a reactive stream, use Flowable.create
.
Simplifying the method into Flowable.create(1
, 2
):1
— a function that receives an Emitter
(basically a downstream Subscriber
instance wrapped into internal Emitters
). Within the function we can call onNext
as many times as needed and terminate optionally either by onError
or onComplete
. Note that when terminated, no additional invocation of onNext
is allowed.2
— Since we cannot control the speed in which the items are emitted, we are required to provide a BackpressureStrategy
governing on how the emission is throttled. Based on the selection, a downstream Subscriber
will be wrapped into a special Emitter
class under the hood, providing the requested backpressure. Jag Saund wrote a good article on the BackpressureStrategy
, although applicable to RxJava 1.0, here (in case you would want to dive deeper).
Example (a stream of the latest SharedPreferences
value).
Note that the BackpressureStrategy.LATEST
has been applied as we are interested only in processing the most recent value. It was also necessary to call setCancellable
on the Emitter
, providing a way to cleanup once the stream completes or a Subscriber
unsubscribes.
We can then use it in our code as follow.
Uncontrolled multithreaded sources
The previous example with OnSharedPreferenceChangeListener
benefits from one significant simplification — the callback is always called within the same, Android’s UI thread. This is not always true for all the cases, especially when it comes to cross-process (AIDL-backed) callbacks invocations.
Android’s ConnectivityManager.requestNetwork
method is a good example of the issue. It can be used to request a network matching specific, provided criteria (i.e. WIFI or Cellular). The method requires a caller to specify a NetworkCallback
instance. Once a network matching the criteria becomes active, onAvailable
is called. Respectively if the network goes down, the callback will be notified with onLost
.
For purpose of the example we will create a stream that requests a Cellular network under the hood, and emits whether it has been connected or not down the stream. First we need to define the data to be emitted.
We clearly have no control over when onAvailable
and onLost
are called. This makes the source uncontrolled — we should be usingFlowable.create
as previously discussed. Note that before obtaining anything from the ConnectivityManager
, the stream should start with Network.NONE
to indicate that the state is yet to be determined. A naive approach to the problem would look as below.
The code then can be tested by printing out both the network and a thread that performed the emission.
Following output can be seen on the LogCat
window as the code is executed.
The stream is subscribed on the main thread (no subscribeOn
specified) and the NetworkCallback
methods are called on internal ConnectivityManager
’s thread. As not problematic as it would seem, this might be a real issue in a case when a callback that is expected to be called on a thread A out of a sudden emits on a thread B. This can happen if, in example, we are registering callbacks on a remote service that initially is running in the same process as the callback. One can then establish initial expectation upon the code that all the callbacks are called from the Main thread. By simply moving the Service
to an external process things will change. A BinderThread
, instantiated for each remote connection, will take over instead.
The way to fix that is to use Schedulers
. By calling createWorker
one can obtain a Worker
instance which can be then used to delegate all the Emitter
’s onNext
, onComplete
and onError
invocations.
The implementation is very similar to what RxJava sources like Flowable.interval
and Flowable.delay
do. The emission is performed by default on a particular Scheduler (could be whatever we need), additionally allowing a caller to change it as needed.
A single Worker
instance is guaranteed to provide sequential execution of jobs internally, thus running on a single thread (one will be picked up in case of pools like io
or computation
). Note that Workers
are Disposable
—registering them as Emitter
’s Disposable
guarantees that no jobs will be executed once the unsubscribe event takes place. This pattern can be seen in the line 30 of the code snippet above.
One important thing to note here is that applying subscribeOn
instead will not solve the problem — it won’t affect the thread that’s notifying the callback. Additionally, observeOn
could be applied down the stream to streamline the emission. This would be however only a workaround to a serious issue occurring upstream.
Summing up, when converting an uncontrolled multithreaded (or potentially multithreaded) source into a reactive stream, use Flowable.create
, scheduling emission on a Worker class.
Sample application with all the code presented in this article can be found on my GitHub. Click below.
That’s it for today. In case you wanted to dive deeper remember to check out my other article on Advanced Reactive Programming.
Let the life flow smoothly like a reactive stream and remember to apply the backpressure where needed.
Thank you!