Working with this library can be difficult at first. This series of articles will guide you through the process of creating, manipulating and managing the execution of the Reactive Streams that offer Reactor through the Mono and Flux classes.
Mono and Flux are both reactive streams. They differ in what they express. A Mono is a stream of 0 to 1 element, whereas a Flux is a stream of 0 to N elements.
This difference in the semantics of these two streams is very useful, as for example making a request to a http server expects to receive 0 or 1 response, it would be inappropriate to use a Flux in this case. On the opposite, computing the result of a mathematical function on an interval expects one result per number in the interval. In this other case, using a Flux is appropriate.
Mono and Flux are lazy
Being lazy is one of the properties of a reactive stream. It means that whatever the number of function call you make on the stream, they won’t be executed until you consume it.
For Mono and Flux, the method to use to consume them is .subscribe(…).
Nothing happens until you subscribe to a Flux (or Mono)
Most common ways to create a Flux
There are several ways to create a Flux. The below code snippet presents some of the most common:
Most common ways to create a Mono
Methods are different for a Mono, except the
just(T... data) method. The below code snippet presents some of the most common:
As you may have noticed, being a 0–1 element stream, Mono is a perfect fit to create reactive streams from Futures, Suppliers or even Runnable as Java methods return at most 1 element.
Common methods to create Flux or Mono
Mono and Flux share 3 useful methods to create them:
Mono.error(Trowable t) and
Flux.error(Throwable t) are very useful methods to handle errors while working with these reactive streams. We will discuss this subject in the second article.
Mono.defer(Supplier<? extends Mono<? extends T>>) is similar to
Mono.fromCallable(...) but its supplier should return a
Mono<T> whereas fromCallable expects a supplier returning a value of type
T. Also, if you call a method that can throw an Exception, with defer you will have to catch it yourself, whereas with FromCallable, the Exception will automatically be wrapped into a
Mono.error(...) . Let me show you with a little piece of code:
As you have seen, both way of creating the Mono gives the same result. Even though the defer method is more verbose, it makes your code easier to understand and allow fore more flexibility as you can remap the Exception into another kind of Mono, or create a new Exception from it.
create(Consumer<MonoSink<T>> callback) method is a lower level one than the other methods we saw, because it deals with the signals inside the Mono and Flux. Let’s look at an example for both Mono and Flux.
For the Mono, let’s do the same as above:
Here is an example for the Flux.create(…) method.
How to consume a Flux or a Mono ?
Now that we’ve seen how to create a Flux or a Mono, let’s take a look at how to use the values they hold. This is the operation of consuming the stream.
For this example, I think that a piece of code will be sufficient :).
In a Flux, data and operations on these data are treated sequentially. It means that for a
Flux.range(1, 5) , the value 3 will never be displayed before 2 as the treatments on it are done before the ones on 3. We’ll discuss that in details in the third article.
One word about the methods you should not call on Flux and Mono.
Mono has a
block() method that returns the value it holds. When calling it, the method block the current Thread. When doing it, your program is not reactive anymore. You should always find a way not to use it. Sometimes, it implies refactoring some of your methods.
Flux has methods that also starts with block. They are
blockFirst(Duration timeout) ,
blockLast(Duration timeout) . The rationale is the same.
All the methods on the Flux that returns non-asynchronous Java structures cause the same problem. These methods are
In this first article, you have learned how to create a Flux or a Mono, and how to consume it in its simplest form.
In the next article, I’ll show you how to manipulate the data they hold. For example, how to add one to every value in a Flux or how to replace the value with its result to a function call.