Advanced RxJava programming with stream nesting

Łukasz Szczygłowski
AndroidPub
Published in
15 min readNov 20, 2017

Since you have opened this article I assume that you are a passionate reactive programmer. You know the basics of the reactive algebra well and seek for deeper understanding of more complex techniques.

The goal for this article is to present what stream nesting is and how can we benefit from it. We will start by analyzing a hypothetical backpressure issue and solving it with the stream nesting technique. Having the required knowledge foundation, we will move on to more complex reactive problems, listed below.

  • Marrying together URI change notifications with ContentResolver query in Android.
  • Avoiding helper classes while using stream combining operators like zip() and combine().
  • Modeling more complex, multi-dimensional reactive problems.

A story of backpressure and overly producing Publisher

Think of a typical reactive stream that emits any data. This could be strings, integers or a custom immutable object consumed downstream. Using the marbles notation we can represent such a stream as below.

source: reactivex.io

The stream emits 2 items and then completes. This is the most basic example of a reactive sequence. Upon subscription, theObserver receives all the data, processes it and then unsubscribes. A reactive stream is also defined as a sequence of Subscribers receiving the data and Publishers providing it. A reactive algebra operator is both Subscriber and Publisher as it receives data from the upper producer, transforms it accordingly and emits down the stream.

Nothing fancy so far, huh? Let’s add more drama. Imagine the stream emits contact book entries and the subscriber is a dispatcher that writes them into a relational database. Each write operation requires a new database connection to be established and then closed, which takes some time. We could model the problem using a following code.

Before we continue, let’s stop for a while to understand the code.

In our example we have a very basic Repository implementation allowing us to “save” the contacts as they come into a storage. We won’t implement a real write operation in here since adding a database would complicate our code. For now let’s keep it simple — we will print each saved contact to LogCat and simulate relational storage specific operations. In order to write to the “database” we need to establish a connection with it (simulating a real RDBMS connection). This operation takes 2 seconds. Once completed, it returns a Connection class instance communicating with the storage. We can then write our data and close the connection once no longer needed. The former is pretty fast taking roughly 10 milliseconds and the latter is more expensive, requiring 2 seconds to process. Now as we know how the storage works, let’s focus on the reactive part.

The save() function receives Contacts from some Publisher, wrapped in the Flowable algebra. Since we are working with a hypothetical database stored on a file system, we need to delegate contacts processing to the Schedulers.io() workers. There are various schedulers available but this one is highly recommended for our scenario as it is based on a dynamic thread pool. This means that it can spawn new threads when existing workers are busy processing heavy I/O operations. Using Schedulers.computation() would not be a good choice here as it houses a fixed thread pool with size equal to device’s CPUs count. In a case where all the computation workers are busy, no further requests can be processed at the same time - oops.

The stream subscriber extends from the DefaultSubscriber class. It is pretty useful abstract implementation that wraps the Subscription and allows us to conveniently use the request() method. What is this method anyways why do we need it? All goes down to the paradigm hated by most of the reactive programmers at the very beginning of their journey, backpressure. The request() method is basically a way for a Subscriber to tell its upstream (a sequence of Publishers above providing the items) how many items it wants to receive. Now in our example that Subscriber would be the DefaultSubscriber instance. From this perspective, the first upstream Publisher is the observeOn() operator. Once the Subscriber says “Hey, I request one item” by calling request(1), the observeOn() will respect the demand and propagate it up the stream to the contacts Publisher also by calling the request() method. This is where things are getting tricky - the observeOn() Subscriber will not call request(1) on it’s Subscription once the emission starts, but request(128) instead. I will explain why as we move on. Anyways, the request() calls chain goes up the stream through the Subscriber+Publisher operators until it reaches the source Publisher..

The source. The Matrix trilogy (1999).

Now the source can either respect the request(N) call from the downstream Subscriber and emit no more than N items or be a villain, ignore the call, and emit all the items anyways.

By using the request() method in our DefaultSubscriber, we can really benefit from the backpressure and control whole the reactive chain. We are doing database operations and can request new items only when we know we can process them. To turn the DefaultSubscriber into a backpressure user, we need to follow the steps below.

  • Override the onStart() method and call request(1) to tell the upstream that we want to receive only one item upon subscription.

We need to do this as the onStart() method requests Long.MAX_VALUE items by default.

  • Each time a new item is received through the onNext() method, we have to request additional items once the partial data is processed.

As we do so, we will be getting one item at a time, only when we need it. We should not worry that we will be overwhelmed with items we cannot process in timely manner. The Subscriber benefiting from backpressure can be like an ideal software engineer, notifying the management immediately about any obstacles that could slow down a project. This is so beautiful!

Beautiful though is not what would interest us here. We wanted to add some drama, remember? Here it comes! Let’s design the source Publisher emitting the contacts. The ugliest we could have here is an overly producing, villain type of source that does not implement the backpressure. Awesome, let’s do it!Flowable.interval() source is one of the approaches we can go with. Why? A quick look on the method’s JavaDoc reveals the reason.

Source: http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#interval-long-long-java.util.concurrent.TimeUnit-

Ignores the backpressure? Why? Well, the reason comes down to the fact that intervals in which the items are emitted must be equal. Having any kind of default “hold on / I want X items” mechanism would prevent this behavior from being guaranteed. It’s up to the downstream to implement the backpressure, using in example one of the onBackpressureXXX operators (there’s a very nice article by Jag Saund, here, explaining those). This is significantly important when we have one interval() source and share it with many subscribers using the family of publish() operators (more on multicasting by Pavlos-Petros Tournaris, here). Each subscriber can then apply it’s own onBackpressureXXX strategy not affecting the source, which would still guarantee to emit each item every N seconds. As we will learn going forward, this will not solve the issue we run into. Our code would be defined as below.

It basically creates a Contact instance every 200 milliseconds. Now let’s take a step back. The save() method has a Subscriber under the hood that, well, for each received contact opens a database (takes 2 seconds), writes the contact (10 milliseconds) and then closes the database (another 2 seconds). This means we need roughly 4 seconds to process each contact. However, the source produces a contact every 200 milliseconds and doesn’t implement the backpressure. Oops. Once we run the code, we can observe how our data is being processed in the LogCat window.

MissingBackpressureException, why? What happened?
Our simple math showed that the Subscriber cannot handle so many items emitted in very short intervals (having 4s processing time and 200ms emission interval, we’re getting roughly 20 times more items than we can process). Even though interval() does not implement the backpressure, it has some sort of internal mechanism used to check if we’re not trying to rely on the backpressure while using it anyways.

Let’s start digging. IntervalSubscriber is the core of the FlowableInterval responsible for emitting the interval items down the stream once triggered by a periodic worker. The class implements 2 interfaces: Runnable and Subscription.

Flowable.interval() source, FlowableInterval, RxJava 2.1.6
  • Runnable
    Upon subscription, FlowableInterval schedules a periodic work on a provided Scheduler (Schedulers.computation() by default, using the schedulePeriodicallyDirect() method). When we schedule a periodic job on the worker, it requires a callback Runnable to call the run() method each time a period completes. IntervalSubscriber uses that as a trigger to increment the value counter and send the new value down the stream, calling onNext() method on the downstream Subscriber (the actual field in the class).
  • Subscription
    This interface provides 2 operations: request(long) and cancel(). The latter is used to unsubscribe from the stream and cancel the scheduled periodic job. The former is more interesting. When a downstream Subscriber calls in example request(2), IntervalSubscriber‘s value (yes, value, since it’s also an AtomicLong) will be incremented by that number. When next item is created inside of the run() method, it also checks the internal value to see if the next emitted item would more than the Subscriber requested. When the condition is met, the MissingBackpressureException is thrown solving our mystery. It’s also worth to notice that even though interval() does not support backpressure, it signals overproduction with a crash.

We have clarified why the stream has been terminated by the Exception. There’s one more interesting thing.. let’s take a quick look again at the stack trace.

Value 128?! Haven’t we requested only one item in the Subscriber by calling the request(1) method? Yes, we did. Does this mean RxJava is broken and our request is ignored up the stream? Well, not really. Remember when at the very beginning I mentioned that even though our subscriber calls request(1), the observeOn() operator will call request(128) on it’s upstream subscription? Time to get down to it.

Let’s take a look at how the observeOn(Scheduler) operator is implemented in the Flowable class.

Flowable.observeOn(Scheduler), RxJava 2.1.6

The method we use by default in most of reactive streams is in fact overload of a more complex observeOn() variant.

Flowable.observeOn(Scheduler, delayError, bufferSize), RxJava 2.1.6

ObserveOn() implementation, embedded in FlowableObserveOn class, is more complex than the interval() code we discussed before, thus I will try to stay away from the algorithm’s details. The bufferSize() method returns BUFFER_SIZE constant value defined in Flowable class. By default that value is set to 128. The number is determined based on the benchmarks performed on RxJava, details can be found in RxRingBuffer class from RxJava 1.x.What’s interesting this value defaults to 16 on Android platform when running RxJava 1.x. ObserveOn() is backed by a lock-free threadsafe single-producer-single-consumer queue used to delegate emission from one Scheduler worker to another and to cache the overflow. I really recommend you to take a deeper look into it to understand how RxJava implements lock-free queues, starting withSimplePlainQueue.java interface and its implementations.

Under the hood, the bufferSize value is used to determine how many items the operator requests once it subscribes to the upstream (which would be request(bufferSize) in the onSubscribe() method).

Coming back to our example, observeOn() requests 128 items on its upstream, the interval() source. FlowableInterval will respect this value and crash with the MissingBackpressureException once the 129th item is created. Even though our subscriber requested only one item, the observeOn() will hold additional 127 items awaiting in queue to be processed. I will discuss the internals of backpressure in my further articles to show what behavior can be expected from a backpressured stream and how can we control it.

So.. we went pretty deep, didn’t we? Time to get back to the surface. Our stream terminates with a MissingBackpressureException. We have identified the root cause and have to fix it. One of the theoretical solutions would be to apply one of the onBackpressureXXX operators mentioned before. However, that would make us miss some contacts (we would not be able to use buffering provided by onBackpressureBuffer as the buffer would overflow, so skipping the items is the only option with these operators). We need something better…

Stream nesting to help

Reminding our math, processing of an item takes roughly 4 seconds (2 seconds to open and close the database and 10 milliseconds to write a contact). Once we have the database opened, we can potentially save more than one contact before we close it. This way we could make our approach more efficient and reduce amount of expensive connections. In a real relational database we could make it significantly faster by using transactions, precompiled sql queries and even dropping indexes of the affected tables at the time of transaction (that would give us though a huge benefit only with big amount of data to insert, definitely more than 10k database rows to insert in Android SQLite).

In theory, the issue could be resolved if we were writing at least 22 contacts to the database within a single connection (processing would take 4s 220ms and items emission 4s 400ms). Let’s round it up to 30. Now, we need to have some way to divide the linear contacts stream into “windows”, each containing 30 items (or less if the stream completes before a window is full). Every window would be a Flowable itself, giving us not only a possibility to save 30 items at once but also to control the laziness and e.g. open multiple connections to the database to even speed up the transactions saving in parallel.

RxJava is already equipped with such operator and, not by coincidence, it’s called window().

http://reactivex.io/documentation/operators/window.html

Window basically nests our stream into sub-streams, each buffering and emitting no more items than the provided count. The sub-stream is an instance of UnicastProcessor which allows only one subscriber to receive the data (which is worth to know if you get to far with your reactive programming like I did the other day…). The method signature itself looks a bit scary but there’s nothing to worry about. There are also more complex overloads of the operator but we won’t discuss them in this article.

Flowable.window(count), RxJava 2.1.6

We can see that our good friend, bufferSize(), is also used by window()! Now we know why. Anyways, we can apply stream nesting, provided by the window() operator to resolve the MissingBackpressureException issue in our code.

save() method patched to adjust with overly producing source

Notice that window() operator emits Flowable<Contact> objects downstream - the method itself returns Flowable<Flowable<Contact>>. Having the nested stream, we apply concatMap() to collect all the items for each of the windows into instances of a List<Contact> that can be atomically saved to the database. The toFlowable() conversion is necessary as toList() returns Single<List<T>> in RxJava 2.x. Once we run our enhanced code, we can notice that there are no exceptions thrown and data is being processed correctly.

The windowing technique is one of the examples where we can benefit from stream nesting but there are more. Let’s discuss them.

Problem 1, reactive ContentObserver

Let’s take an example of stock Android Contacts ContentProvider. We want to build a reactive app that would query for contacts and observe for content changes. Let’s start building our reactive code with representing a Cursor with reactive stream.

The itemsFor() function converts provided Cursor into a stream by converting every row into particular data class and then emits them down the stream. Flowable.generate() is the right way to go here as it gives on-demand handling of the backpressure. The itemsFor() function takes in the following parameters:

  • queryRawData is a function needed to obtain a Cursor object containing raw data from the ContentProvider, through the ContentResolver api.
  • mapRawData is a transformer that maps each Cursor row into a data class. It is important as we should be passing only immutable data down the stream. In a case if we were passing the Cursor itself downstream, a subscriber could modify its state affecting our generation logic.

The next thing we need is a way to observe ContentProvider changes. The need is to get a trigger once the data changes so we can obtain a new Cursor, transform it and pass down the stream using the itemsFor() function.

The function registers a ContentObserver for provided Uris and emits a next item downstream (returned by the onUriChanged function) once a data change is detected. Notice that ContentObserver requires a Handler to instantiate, thus the thread we initialize this source on must be a HandlerThread. Our backpressure strategy is LATEST as we only care about the most recent data set.

We have both the needed pieces. Let’s connect them together.

Let’s explain what is going on. The uriChangesOf() function calls the onUriChanged method inside. Now, the onUriChanged parameter will be the itemsFor() querying for a Cursor from the ContactsProvider and mapping each of its row to a SimpleContact instance.

It is all about threading. ContentObserver instance that’s running under the hood of uriChangesOf() method requires a HandlerThread to create the Handler. Since creating a custom HandlerThread requires additional handling (starting and stopping), we would definitely go with a UI thread to keep things simple. On the other hand, the ContentResolver.query() operation that’s called in the itemsFor() method has to be executed on non-UI thread. The solution presented in the code snippet above allows us to make sure the ContentObserver is created and maintained on the UI thread while I/O database queries are done on one of the Schedulers.io() background workers.

We used the stream nesting to do it. Let’s explain how. The itemsFor() function returns Flowable<SimpleContact>. The uriChangesOf() returns Flowable<T> where T is a type returned by the onUriChanged method. Since we pass itemsFor() as onUriChanged, this means our source returns Flowable<Flowable<SimpleContact>>, which is a nested stream. The subscribeOn(AndroidSchedulers.mainThread()) at line 20 affects the outer producer (itemsFor()) ensuring that the ContentObserver will be running on the UI thread. The outer stream emits Flowable<SimpleContact> which can be thought of as a source of contacts that will start emitting once triggered. How can we subscribe to it? Either by using flatMap, concatMap or switchMap . We rely on alphabetic order of the contacts, so flatMap is definitely not an option. Now let’s think of a database that contains many contacts, so emitting all the SimpleContact instances would take a while. When the data changes, we would like to immediately switch to the new source and query only for the most recent data, abandoning the previous ongoing query. This means switchMap is our way to go. Now, we need to make sure that the Flowable<SimpleContact> is subscribed on a non UI thread, thus we need to precede it with the observeOn(Schedulers.io()) call.

Problem 2, bypassing helper classes

Stream nesting can also be used to avoid intermediate data classes when working with operators like zip and combine. For the purpose of this example, let’s assume we have a Retrofit API that returns contacts from the server and mock it by the code below.

In order to get the contacts we need to have 2 things: the server url and the user credentials. Since both are dynamic and can change at any time, we should represent them with a reactive stream. Now, the function that would process both the streams and query for contacts would look as follow.

Notice that we had to create an intermediate data class to hold the results of combineLatest. We can apply the stream nesting technique and bypass the class by doing the following.

Problem 3, returning nothing

Imagine we have 2 streams emitting boolean values, named triggers and gatekeepers. We then combine them together using the withLatestFrom operator with a following business logic:

  • if the most recent value emitted by gatekeepers is true, we emit all the values provided by triggers downstream,
  • otherwise we don’t emit anything down the stream upon receiving a value from triggers.

This problem can be easily modeled using the stream nesting.

Summary

It was a long journey. We started with analyzing the case of a fast producing stream and a slowly processing subscriber. We run into the MissingBackpressureException and deeply analyzed its root cause. Looking for solutions we discovered that the backpressure issue can be resolved by windowing the data, using the window operator. This revealed the stream nesting technique and how we could benefit from it. We then moved on to applying the technique on 3 sample reactive problems: a reactive Android’s ContentObserver, removing intermediate helper classes while using stream combining operators and suppressing downstream emission with Observable.empty().

I hope you will find a way to apply the technique in your projects and that you enjoyed reacting the article. My next plan is to write about deeper implementation of the RxJava backpressure mechanism and how we can benefit more from it. The stream nesting technique also allows to model more complex reactive problems which I will also focus on in my upcoming articles.

The source code for all snippets mentioned is available on GitHub, here.

Thank you!

--

--