RxJava2: Schedulers 101 or simplified concurrency, part 2.

Jacques Ledoux
7 min readJul 12, 2018

--

1. Overview

This post is the second installment of a series on RxJava2 Schedulers and concurrency. The first part covered the Schedulers management basic operators with subscribe(), subscribeOn() and observeOn(). We’ve seen that these operators were used to dynamically switch Scheduler at various points of the stream but alas, were still inherently sequential.

Often times, Reactive programming is perceived as being concurrent by default. This is far from the truth as reactive streams are inherently asynchronous but sequential. Like with standard Java, concurrency has to be explicitly configured and managed, but the Reactive APIs greatly simplifies the coding and how to reason about it. Briefly, a stream is sequential at its ends but has the powerful capability to be concurrent in its core.

2. Sequential reactive

To demonstrate the sequential nature of the reactive streams as well as their capability to quickly morph into a concurrent animal, we’ll extend our previous article Member examples. The member.json file has been split into three files, each representing a region’s membership.

Let’s first look at the basic file fetching functions from the SyncSampleStream class (which instance is referred to in the tests as “sample”). Basically, it uses Apache-Common-IOUtils FileUtils to synchronously load the file.

Note: The code is structured to support a pedagogic objective and might not be as compact and expeditive as could be. The same approach explains using @Test to expose results instead of assertions. User that will clone the Github repo will be able to easily explore alternatives simply by removing, adding or modifying a single line in the tests.

public class SyncSampleStream {    void printThread(String title) {
System.out.println(String.format("%s thread: %s",
title, Thread.currentThread().getName()));
}
File getFileObject(String filename) {
ClassLoader cl = getClass().getClassLoader();
return new File(cl.getResource(filename).getFile());
}
String fetchFileAsString(String filename) throws IOException {
printThread(String.format("Fetching %s from ", filename));
return FileUtils.readFileToString(getFileObject(filename),
StandardCharsets.UTF_8);
}
}

The first test goal is to demonstrate that Observables are implicitly sequential:

@Test
public void testingScheduling_Sequential()
throws IOException, InterruptedException {
//given...
String[] regions = {"East", "West", "Down"};
CountDownLatch latch = new CountDownLatch(1);
Observable<String> observable = Observable.fromArray(regions)
.map(region -> String.format("members-%swood.json",
region.toLowerCase()))
.doOnEach(regionName -> sample.printThread(
String.format("Submit region: %s", regionName.getValue())))
.observeOn(Schedulers.io())
.map(regionName ->
sample.fetchFileAsString(regionName))
.subscribeOn(Schedulers.single())
.observeOn(Schedulers.single());
sample.printThread(
"From testingScheduling_Sequential subscribe");
observable
//when...
.subscribe(list -> { //then...
System.out.println(list);
},
Throwable::getMessage,
() -> latch.countDown()
);
latch.await();
}

We have a Test function that creates an Observable construct

  1. Applies a map() transformations to provide a full file name such as “members-eastwood.json”
  2. Switch to Schedulers.io before calling the I/O bound functions
  3. Subscribe on and observe on Schedulers.single()

Then, the test subscribes to the Observable construct, turning it into a live stream

Note: It might seem peculiar that the subscribeOn() be immediately followed by the observeOn() in the Observable construct. Remember that the subscribeOn() is bubbling up the stream at subscription time while the observeOn() is applied as the stream executes top to bottom.

And here are the results:

From testingScheduling_Sequential subscribe thread: main
Submit region members-eastwood.json thread: RxSingleScheduler-1
Submit region members-westwood.json thread: RxSingleScheduler-1
Fetching members-eastwood.json from thread: RxCachedThreadScheduler-1
Submit region members-downwood.json thread: RxSingleScheduler-1
Printing members... thread: RxSingleScheduler-1
Fetching members-westwood.json from thread: RxCachedThreadScheduler-1
Fetching members-downwood.json from thread: RxCachedThreadScheduler-1
[
{
"city": "Eastwood",
"firstName": "Rosalyn Cartwright",
"lastName": "Honduras",
"email": "Newton.Thiel@kenny.name",
"id": 0
} ...
]
Printing members... thread: RxSingleScheduler-1
[
{
"city": "WestWood",
"firstName": "Elwyn Kiehn",
"lastName": "Christmas Island",
"email": "Vida@hilario.biz",
"id": 3
} ...
]
Printing members... thread: RxSingleScheduler-1
[
{
"city": "DownWood",
"firstName": "Amari Herman",
"lastName": "Vietnam",
"email": "Marquis@gaetano.net",
"id": 6
} ...
]

Note: Message ordering might be different on your system. Just verify that all messages are printed.

As we observe for each file, the first map() “Submit…” is executed from Schedulers.single. Then, Schedulers.io is used for fetching the files, but even if io is backed by multiple threads, all files are loaded sequentially on the same thread. Finally, we switch back to Schedulers.single to output the results.

So, even if we provided a multi-threaded Scheduler, the stream had not automatically taken advantage of it. This means that concurrency requires explicit setup. The next section will explain how to configure concurrency in the stream construct.

3. Concurrent reactive

The dreaded FlatMap operator (background evil laugh)

From the official ReactiveX documentation:

The FlatMap operator transforms an Observable by applying a function that you specify to each item emitted by the source Observable(1), where that function (2) returns an Observable that itself emits items. FlatMap then merges the emissions (4) of these resulting Observables, emitting these merged results as its own sequence.

Not quite easy to figure out how this description will lead to concurrency so let’s refer to this diagram with succinct comments below.

  1. The source Observable at the top of the primary stream emits items on the source stream
  2. In the flatMap() operator, you configure a new Observable construct that transforms the source emissions (file names) into their file content.
  3. Aaaa…Haaaa!!!
  4. Then, flatMap() merges the internal Observable stream emissions, emitting them sequentially on the source stream.

Let’s now create the code that applies all the nice things we just learned. As usual and for experimentation convenience, we’ll shape it as a test:

@Test
public void testingScheduling_Concurrent() throws InterruptedException {
//given
String[] regions = {"East", "West", "Down"};
CountDownLatch latch = new CountDownLatch(1);
//given...
Observable<String> observable = Observable.fromArray(regions)
.map(region -> String.format("members-%swood.json",
region.toLowerCase()))
.doOnEach(regionName ->
sample.printThread(String.format("Submit region %s",
regionName.getValue())))
.flatMap(regionName ->
Observable.just(regionName)
.subscribeOn(Schedulers.io())
.map(new Function<String, String>() {
@Override
public String apply(String regionName) throws Exception {
return sample.fetchFileAsString(t);
}
})
).observeOn(Schedulers.single());
//when...
sample.printThread("From testingScheduling_Concurrent subscribe");
observable
.subscribeOn(Schedulers.computation())
.subscribe(list -> { //then...
sample.printThread("Printing member list from: ");
System.out.println(list);
},
Throwable::getMessage,
() -> latch.countDown()
);
latch.await();
}

OK, the “given” defines the observable variable. We start with the same mapping but then, we change our previous map() to a flatMap(). The “function you specify” is an Observable construct that emits the region name received from the source stream, immediately followed by the concurrency required subscribeOn(Schedulers.io). From that point, flatMap() will concurrently subscribe to all subsequent operator’s output until the last one. Then, the final flatMap() merging step is applied.

Now let’s run this and look at the results.

From testingScheduling_Concurrent subscribe thread: main
Submit town members-eastwood.json thread: RxComputationThreadPool-1
Submit town members-westwood.json thread: RxComputationThreadPool-1
Fetching members-eastwood.json on thread: RxCachedThreadScheduler-1
Fetching members-westwood.json on thread: RxCachedThreadScheduler-2
Submit town members-downwood.json thread: RxComputationThreadPool-1
Submit town null thread: RxComputationThreadPool-1
Fetching members-downwood.json on thread: RxCachedThreadScheduler-3
Printing member list from: thread: RxSingleScheduler-1
[
{
"city": "Eastwood",
"firstName": "Rosalyn Cartwright",
"lastName": "Honduras",
"email": "Newton.Thiel@kenny.name",
"id": 0
}
...
]
Printing member list from: thread: RxSingleScheduler-1
[
{
"city": "WestWood",
"firstName": "Elwyn Kiehn",
"lastName": "Christmas Island",
"email": "Vida@hilario.biz",
"id": 3
}
...
]
Printing member list from: thread: RxSingleScheduler-1
[
{
"city": "DownWood",
"firstName": "Amari Herman",
"lastName": "Vietnam",
"email": "Marquis@gaetano.net",
"id": 6
}
...
]

Let’s check the results:

  1. Submits are run on the computation, all on the same thread… Check
  2. Fetches are run on different threads from the io… Check
  3. Prints are all run on the same thread on single… Check

A flatMap() is a strange beast. Technically, the “function that you specify” transforms an Observable<input> into a stream of sub-streams of Observable<output> which is equivalent to an Observable<Observable<output>>. Then, flatMap() immediately subscribes to these sub-streams ALL AT ONCE which means that if a concurrent context is available (ie, we are running on a multi-threaded backed Scheduler), the sub-streams will be processed concurrently.

To test this assertion, just comment out the subscribeOn(Schedulers.io()) line, run the test and we’re back to sequential where all fetches are on the same thread. Finally, flatMap() only subscribes to the first 128 sub-streams (maxConcurrency parameter). However, this is configurable by inserting a maxConcurrency parameter in the stream.

Few things to remember:

  1. Use the right Scheduler for the task. In this case, Schedulers.io was the right choice but switch to another one as soon as you are done with I/O bound work.
  2. Make sure to use a multi-threaded backed Scheduler to gain concurrency. The potential Schedulers are io, computation or from(Executor) for which you have to provide your own configured Executor with minimally 2 threads.
  3. flatMap does not guarantee the same output ordering as input, but concatMap() works like flatMap() and preserves input order.
  4. The Function<String, String> in the flatMap() could be shorthanded with a simple reference call such as: “sample.fetchFileAsString::regionName”.

4. Conclusion

As we’ve seen, flatMap() is definitively a precious arrow in the RxJava’s quiver but again, “with great power comes great responsibility and often… complexity”. Once you internalize the concept that flatMap()acts as a container that concurrently dispatches the “function that you specify” and catches/merge its output, things get clearer in our mind.

Concurrency only makes sense for Observable and Flowable stream source since Single, Completable and Maybe only provide a maximum of one single emission which, obviously, does not warrant concurrency at all. We’ve seen how to implement concurrency with an Observable source but is it applicable to Flowable as well? Nope… because of backpressure support.

Backpressure is a beast by itself but RxJava2 API’s are helping to substantially handle it transparently as long as you follow some strict rules. We’ll talk about backpressure handling in future posts but in the meantime, suffice to say that there is a special Parallel operator to handle concurrency on Flowable streams.

As for this series on Scheduler, Part 3 will talk about production grade Test methods for concurrent streams so stay tuned. As always, you can find the code on Github.

--

--

Jacques Ledoux

Mature and experienced IT professionnal that returns to his karma which has always been business analysis, software development and writing. #Aging Geeks