Declarative Concurrency with Reactor

Using Reactive Programming to write concurrent readable code

--

In this article, we’ll look at different ways to solve a common task. Let’s assume that we have a list containing user IDs and a method fetchUser() to query a user by ID. The fetchUser() method works by blocking the calling thread until the requested user is loaded and throws a RuntimeException if necessary. Our goal now is to implement a method loadUsers() that tries to load all users corresponding to the given IDs. It should return the users that were loaded successfully and the IDs of any users that can’t be loaded.

Loading users sequentially using Collection.stream()

Our first approach is to load all users sequentially. To keep track of the users that were successfully loaded, as well as of the IDs of any users that can’t be loaded, we’ll use the Either class provided by the Javaslang library. This class has exactly two subtypes: Left and Right. In general, the subtype Right is used to wrap the expected result and the subtype Left is used to signal a failure.

This implementation works fine — but it loads the users sequentially. This means if it takes 200 milliseconds to load one user, the method will take approximately 20 seconds to load 100 users. So we’d like to call fetchUser() concurrently instead.

Parallel loading of users with Collection.parallelStream()

The easiest way to call fetchUser() concurrently is to use a parallel stream. Instead of transforming the list cwids into a sequential stream by calling cwids.stream(), we now call cwids.parallelStream(). This means the fetchUserAsEither() method that is used to map the elements of the stream will be executed on a ForkJoinPool.

If we use this implementation to load 100 users and it takes 200 milliseconds to load each one as before, this method will need approximately 2.9 seconds to complete on an average machine. This is still a long time compared to the time needed to load a single user.

The reason for this behaviour is the following: by calling the terminal method collect() on a parallel stream, the operations given as parameters to methods like map() will be evaluated as ForkJoinTasks. These tasks are by default executed in the pool ForkJoinPool.commonPool(), and its default level of parallelism is equal to the number of available processor cores minus one. That means: if there are eight cores available, only seven users can be fetched concurrently.

The level of parallelism of the common ForkJoinPool can be configured by setting a system property. However, there’s another reason why we shouldn’t rely on ForkJoinPool.commonPool().

If we are developing a web application and the processing of incoming requests involves parallel streams, the whole application can become unresponsive by executing a blocking task in the common ForkJoinPool. So it’s better to specify a dedicated thread pool for invoking blocking operations. This will help to isolate unresponsive resources and erroneous functions from the overall system. Protecting systems by isolating certain parts is also known as the Bulkhead Pattern.

The question now is: how we can configure the thread pool on which the operations of a parallel stream will be executed?

To answer this question, let’s look at the implementation of the fork() method in the ForkJoinTask class:

This method checks if the current thread is managed by a ForkJoinPool. If so, the execution of the ForkJoinTask will be scheduled on the same thread pool. Consequently, if we submit the processing of a parallel stream to a custom ForkJoinPool, this pool will also be used to execute the operations of the stream:

We can use this approach to configure the thread pool used by the parallel stream. Submitting a task to a ForkJoinPool returns a ForkJoinTask, which implements the Future interface, so we need to block the execution of the calling thread by invoking Future.get().

Configuring a timeout for fetching a user

So far we figured out how to call the fetchUser() method concurrently and supply a customised thread pool. But we still need a way to control the execution of the individual tasks, for example by setting a timeout.

Instead of using a parallel stream with a custom ForkJoinPool, we’ll work with a sequential stream again. But this time, we’ll combine it with an ExecutorService.

We divided the loading of all users into two steps. First, we map the given IDs to tuples consisting of an ID and a Future wrapping the corresponding invocation of fetchUser(). The Tuple class is provided by the Javaslang library we used earlier. As a second step, we block the calling thread to collect the loaded users. Every time Future.get() throws an exception, we return the ID of the user that could not be loaded. This is why we used tuples in the first step: we needed to pass on the IDs.

Now we can control the execution of the individual tasks. In this example, we set a timeout of two seconds by calling Future.get() with appropriate parameters.

But our solution has a flaw — we need to iterate twice. In the first iteration we map IDs to Futures and in the second iteration we block and wait for the results. This is necessary because a stream is lazy. If we would just chain the task submission and blocking in two consecutive map operations, the loadUsers() method would not be parallel at all. It would submit one task and immediately block and wait for its result before submitting the second task.

Luckily, we can eliminate this flaw by working with CompletableFutures instead of plain Futures.

This time we only need one iteration that is completely non-blocking. First, we map each ID to a CompletableFuture by using the static factory method CompletableFuture.supplyAsync(). The outcome is a stream consisting of Futures. But what we really want is a Future wrapping our result list. To achieve this, we can use the Stream.reduce() method with two custom combiners: combine() and combineAll().

As before, we can control the execution of the individual tasks. Here we use the non-blocking methods thenApply(), orTimeout() and exceptionally(). We wrap the users that were successfully loaded with the Right type, and those that couldn’t be loaded with Left. We also set a timeout of two seconds for each task.

Sadly, I had to cheat a bit — the examples are intended to run with Java 8, but the orTimeout() method will only be available in Java 9. Luckily, Tomasz Nurkiewicz explains in his blog how to use asynchronous timeouts with Java 8.

Becoming declarative with Flux

The examples presented so far were rather involved and perhaps hard to understand. This is a pity because the original problem is not complex at all. So let’s consider another way to solve it: by using the reactive library Reactor.

We start by converting the given list of IDs into a Flux using the factory method fromIterable(). Then we use the flatMap() method to control the asynchronous execution of fetchUser(). We wrap the execution as a Mono and define a timeout as well as the error handling. We can use the subscribeOn() method to provide a Scheduler that loads the users in parallel. As soon as we subscribe to eventualUsers, the flatMap() operator will take care of subscribing to each Mono using the provided Scheduler.

There are some differences between Flux and CompletableFuture. One difference is that a Flux is lazy by default. If no one subscribes to it, it will not load any users at all. Conversely, the existence of a Future denotes that the execution of a task has already begun. Here we start loading the users by calling the Flux.toStream() method.

Declarative Concurrency with Reactive Programming

In this article, we looked at different ways to load users concurrently. We started with code that was hardly understandable and managed to turn it into something readable using Reactive Programming techniques. But reactive libraries like Reactor not only help to write concurrent code in a declarative fashion, they also bring along specialised Schedulers that help to optimise the use of multiple cores by improving the cache locality of processed data.

In closing, I’d like to point out two problems with the way we used thread pools, which would need to be addressed in an actual implementation. First of all, we never took care to shut them down and clean up the resources. Secondly, you should pay attention when using the convenient factory method newFixedThreadPool(). It returns an ExecutorService that uses an unbounded LinkedBlockingQueue as a work queue. This can be dangerous if you are not in control of the number of tasks to be scheduled.

References

--

--