One of the great advantages of using a Reactive Streams implementation is that it abstracts away concurrency. That doesn’t mean it divorces the user from needing to run things on different threads or having to occasionally control where they run. There are a couple of useful operators that can be used to schedule tasks on different threads. In RxJava they are observeOn and subscribeOn, and in reactor-core they are publishOn and subscribeOn. The observeOn and publishOn operators let all emissions after the operator happen on another thread. The subscribeOn operator moves your entire Observable or Publisher to a new thread. This can be tricky, and so to make it clear I have some examples below — first using the JDK8 ForkJoinPool, and then using reactor-core. In the examples, I’m going to be printing random numbers and the threads that produced them. The last example will be a bit trickier and add the numbers up.
The first example is a baseline — you want to run your code on the currently executing thread. It’s a pretty simple scenario, shown first in JDK8 and then in Reactor.
Here’s the reactor example:
Nothing special to see here — just iterate from 0 to 9 and print a random number. The next example is more complicated. What if you want to print the numbers on a different thread? Here’s an example using JDK8 and ForkJoinPool:
Now here’s the example using reactor:
The import thing to note here is the subscribeOn operating. In this example, it’s telling reactor to subscribe on a worker provided by the parallel scheduler. The blockLast operator is called instead of subscribe because the code is running in a different thread we must wait until it’s completed or the JVM would exit.
Another thing to note is that the subscriber of the publisher can choose to run it on different threads independent of the publisher’s creator.
In this example the ‘observable’ object was returned by a method. The subscriber could either run on the current thread or do what the example did and run it using a scheduler on potentially another thread.
This next example shows how to run each random number generation in a different thread. Here’s what it looks like using JDK8’s ForkJoinPool:
This time we had to add the ‘ForkJoinTask’ objects a list and then block on them to make sure each task was executed.
Here is the equivalent example using reactor-core:
This example uses a flatMap to map and merge the results of another Publisher together. The publishers created inside the flatMap — in this case a Mono that prints a message — is told to subscribe using a scheduler. This causes each of these different Monos to run on a different thread. The main publisher is then blocked until everything is finished. What’s interesting about this is that the Monos could be anything — a synchronous Mono that prints a message, a runnable that prints a message on a different thread, or an off-box network call. They are all treated the same.
The final example does work on two different threads. The first thread generates a random number, prints it and then makes it available to another thread. The second thread then adds up the random incoming numbers printing an intermediate total, and then a final total after it gets 10 random numbers. Here’s what the example looks like in JDK8 using the ForkJoinPool:
Things are starting to get trickier here. We must use a latch and count how many receive to make sure that we have ten items. Also, we needed us some kind of thread-safe queue to send data between the threads.
Here’s what the example looks like with reactor-core:
Before we get to the publishOn operators let’s look at the map, and reduce operators. The map operator takes an integer generated by range, and then returns the random number that is generated. This is then passed to the reduce operator. In this case we are just adding two numbers, and it will add all the different random number as they are generated. At the end we call block to wait until everything is done.
This example uses the publishOn method. The first publishOn method is before the map operator. This makes the map operators happen on another thread. There is another publishOn right after the map operator and right before a reduce operator. This will cause all the reduce operations to also happen on a different thread. There isn’t another publishOn before the doOnNext, so it happens on the same thread of the reduce operator.
It is in more complex examples like this that Reactive Streams schedulers really shine. Rather than having to spend time reasoning about complex and potentially buggy multithreaded code using Java’s concurrency utilities, having understood the scheduleOn and publishOn operators it is a simple matter to introduce parallelism where is it needed in a way that it both concise and easy to reason about.