RxJava2: Schedulers 101 or simplified concurrency, part 1.

Jacques Ledoux
7 min readJul 9, 2018

--

Today, we’re going to talk about Scheduler but first, let’s clarify that they have nothing to do with a hockey game or a dentist appointment. Schedulers, in the asynchronous/reactive universe, are objects that simplify concurrency management. To summarize the definition provided by RxJava2 Javadoc, a Scheduler is basically an API used to “schedule” a unit of work to be processed by an underlying execution scheme, be it a Thread, an event loop, an Executor or an Actor.

However, don’t let the title fool you into thinking that, all of a sudden, concurrency management has become a trivial given. Schedulers have simplified the concurrency management API but one still has to understand and reason about what’s going on underneath.

Note: This post will only address cold Observables stream constructs. We’ll look at hot Observables in a future post.

1. Schedulers

Basically, a Scheduler closely resembles a Java Executor or a thread pool. RxJava2 provides ready-made Schedulers, each of which being configured for a specific type of work. They have their own specification regarding numbers of Threads and job order. Few links where you can get detailed documentation for each type are provided as you read. For now, we’ll simply enumerate the provided Schedulers with a brief description of their respective behavior;

  1. Schedulers.computation: used for expensive CPU computations, as many threads as CPUs
  2. Schedulers.io: used for high-latency I/O operations like I/O, unbounded thread pool
  3. Schedulers.single: performs task sequentially and in order on a single Thread
  4. Schedulers.trampoline: used for recursive operations to avoid stack overrun
  5. Schedulers.newThread: creates a new Thread for each scheduled unit of work
  6. Schedulers.from(Executor executor): creates and returns a Scheduler backed by the provided executor.

2. Operators

Now, let’s get familiar with stream operators that provide control over the Schedulers.

subscribe(): this might sound strange but it is of utmost importance to understand that any Observables stream is, by default, executed on the thread from which subscribe() is called.

subscribeOn(): configures the subscription Scheduler on which the source Observables will emit on. It can be positioned anywhere in the stream as it is only applied when subscribe() is called.

observeOn(): changes the emitting Scheduler from its declared stream position and for all downstream operators. Can be applied as many times as required in a stream and is often used to avoid processing operators on the UI thread.

Now we’ll look at some practical aspects of the theory we just learned. First, let’s code a stream we’ll use in all our subsequent examples:

Observable<Member> getEachMemberAsObservableItem(File source)
throws IOException {
return Observable.<Member>create(emitter -> {
try {
ObjectMapper mapper = new ObjectMapper(); //---1---
List<Member> list = mapper.readValue(source,
new TypeReference<List<Member>>() {});
list.forEach(member -> { //---2---
printThread(
String.format("Emitter for %s = ",
member.getEmail()));
emitter.onNext(member);
});
} catch (IOException e) {
printThread("Emitter for onError = ");
emitter.onError(e);
return;
}
printThread("Emitter for onComplete = ");
emitter.onComplete();
});
}
void printThread(String title) {
System.out.println(String.format("%s thread: %s",
title, Thread.currentThread().getName()));
}
  1. Using a Jackson’s ObjectMapper, we synchronously extract individual members from member.json into a list from which we emit members individually.
  2. We record the emitting thread for each member emission, including their email so we can track each Observable<members> in the subscription.

I’ll let it the reader as an exercise to rewrite that part to load data asynchronously. You can refer to the same article sections mentioned below for examples.

All examples will subscribe to this Observable stream with various subscribeOn() and observeOn() configuration and expose their results.

2.1. SubscribeOn

A reactive stream must be fed somehow with data. The most frequently seen methods are:

  1. A source operator ( from…(), just(), etc.)
  2. An input from another stream
  3. Building an adapter using Observables.create(-> emitter{…})

Note: For examples of above items 2 and 3, see section 3.2 and 3.3 here

Subscription is the trigger that starts the execution of a cold Observables stream construct. Calling subscribe() on a stream sends an upstream subscription signal that eventually reaches the stream source and triggers its downstream execution. That’s fine and dandy but, what does this has to do with Schedulers? Well, we mentioned earlier that the default thread on which the Observables emits is the same thread subscribe() is called from. As we’ll see, this is not always the best choice.

Selecting the right Scheduler to subscribe on is quite important if, for instance, you need to build an emitting adapter with a long initializing process. The point is that all the code contained by the Observables.create(emitter -> {…}) is executed on the subscriber’s thread. For instance, it might not be a good idea to synchronously fetch a large local file or do a remote API call if subscribing from the UI thread. We’ll see below that we may define a stream in a thread and subscribe on another one. As for source operators, bear in mind that they are an adapter as well and use a create()function internally.

That is where the subscribeOn() operator comes in handy. By carefully setting the Scheduler on which the source operator will start emitting data, we avoid bloating a Scheduler assigned to some other task type than the one at hand. Again, one must still select wisely so as not to bloat Scheduler with an unbounded Executor thread pool (e.g. I/O). These can grow quite large and ultimately degrade performance.

Let’s look at some examples, but bear in mind that the result order shown here might not be the same on your setup. Just make sure each the emission has a corresponding reception.

First, no subscribeOn() nor observeOn():

@Test
public void givenSubscribeOnMain_whenNoChange_thenResult()
throws IOException, InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
printThread("Subscribe thread = ");
sample.getEachMemberAsObservableItem(memberFile)
.subscribe(member -> {
printThread(String.format(
"Observer for %s = thread: ", member.getEmail()));
},
Throwable::getMessage,
() -> latch.countDown());
latch.await();
}

The results are:

Subscribe thread = mainEmitter for Newton.Thiel@kenny.name =  thread: main
Observer for Newton.Thiel@kenny.name = thread: main
Emitter for Gerardo@addison.co.uk = thread: main
Observer for Gerardo@addison.co.uk = thread: main
Emitter for George@nedra.co.uk = thread: main
Observer for George@nedra.co.uk = thread: main
Emitter for onComplete = thread: main

As we observe, everything happens on the calling thread, “main” in this case.

Let’s use subscribeOn() to use the Schedulers.io since we read on disk:

@Test
public void givenSubscribeOnIo_whenNoChange_thenResult()
throws IOException, InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
printThread("Subscribe");
sample.getEachMemberAsObservableItem(memberFile)
.subscribeOn(Schedulers.io()) //---1---
.subscribe(member -> {
printThread(String.format(
"Observer for %s = thread: ", member.getEmail()));
},
Throwable::getMessage,
() -> latch.countDown());
latch.await();
}

We declared a subscribeOn() just before subscribing and here is the effect:

Subscribe mainEmitter for Newton.Thiel@kenny.name =  thread: RxCachedThreadScheduler-1
Observer for Newton.Thiel@kenny.name = thread: RxCachedThreadScheduler-1
Emitter for Gerardo@addison.co.uk = thread: RxCachedThreadScheduler-1
Observer for Gerardo@addison.co.uk = thread: RxCachedThreadScheduler-1
Emitter for George@nedra.co.uk = thread: RxCachedThreadScheduler-1
Observer for George@nedra.co.uk = thread: RxCachedThreadScheduler-1
Emitter for onComplete = thread: RxCachedThreadScheduler-1

Interestingly, the first message declares “Subscribe thread: main” while we positioned the subscribeOn(Schedulers.io) before subscribe(). Well, as explained earlier, subscribe() sends a signal that moves upstream to the stream source, so it’s only after the message has reached the subscribeOn() that the Scheduler is switched to io. Consequently, the subscribe() itself is executed in the main thread.

We can verify that on the second line where the first emission is sent on the RxCachedThreadScheduler-1 from the Schedulers.io. That same thread is used for all downstream operators since we have not yet used observeOn() to switch Scheduler as we roll.

A more detailed explanation is that many source operators and, of course the create() static function, typically produce side-effects. They need to reach outside of their own scope to establish a connection with the actual data source. Conversely, most instance operators such as map(), filter(), take(), etc. are pure as they only subscribe to their direct upstream and call their direct downstream operators. Also, there are many blogs that posits there is no effects for having many subscribeOn() in the stream since only the highest one close to the source will apply. Well, there are some side-effects edge cases for which this is not really true. Check this post for more details.

2.2. ObserveOn

Choosing the right subscribing Scheduler is important, but the stream might not ideally process all it’s operators there. Let’s now see how we can change the Scheduler as we progress downstream. The observeOn() operator changes the stream’s effective Scheduler from where it is declared until either another observeOn() is declared or the final subscribe() is reached.

There are not much more to say besides “with great power comes great responsibility”. Selecting the right Scheduler for the next operator’s job(s) in the stream is important to maintain the performance of the overall application. In our example, it wouldn't be wise to keep processing the stream on the io thread. So we’ll switch to the Schedulers.single until the end of the stream.

Let’s now look at observeOn() in action:

@Test
public void givenSubscribeOnIo_whenChangeToSingle_thenResult()
throws IOException, InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
printThread("Subscribe");
sample.getEachMemberAsObservableItem(memberFile)
.subscribeOn(Schedulers.io()) //---1---
.observeOn(Schedulers.single())
.subscribe(member -> {
printThread(String.format(
"Observer for %s = thread: ", member.getEmail()));
},
Throwable::getMessage,
() -> latch.countDown());
latch.await();
}

Let’s check the results:

Subscribe mainEmitter for Newton.Thiel@kenny.name =  thread: RxCachedThreadScheduler-1
Emitter for Gerardo@addison.co.uk = thread: RxCachedThreadScheduler-1
Emitter for George@nedra.co.uk = thread: RxCachedThreadScheduler-1
Emitter for onComplete = thread: RxCachedThreadScheduler-1
Observer for Newton.Thiel@kenny.name = thread: RxSingleScheduler-1
Observer for Gerardo@addison.co.uk = thread: RxSingleScheduler-1
Observer for George@nedra.co.uk = thread: RxSingleScheduler-1

The subscription is on main and we now know why. All ‘Emitter onNext’ are from the same Schedulers.io as before. However, the Observer thread is now TxSingleScheduler-1

3. Conclusion

Conclusion??? What about concurrency? All that was shown is how to change thread as we move downstream to the final subscription. No concurrency at all… everything is sequential, going from one Scheduler to another.

That’s true but, the good news is that the few but important operators and Schedulers we have seen in this post are going to be the stars of part 2 about Observable stream concurrency.

You can check the code on Github.

--

--

Jacques Ledoux

Mature and experienced IT professionnal that returns to his karma which has always been business analysis, software development and writing. #Aging Geeks