Antoine Cheron
Jan 18, 2018 · 4 min read

Reactor is a Java library for creating reactive non-blocking applications on the JVM based on the Reactive Streams Specification.

Working with this library can be difficult at first. This series of articles will guide you through the process of creating, manipulating and managing the execution of the Reactive Streams that offer Reactor through the Mono and Flux classes.

How a Flux works (credit: https://projectreactor.io/docs/core/release/reference/)

Mono and Flux are both reactive streams. They differ in what they express. A Mono is a stream of 0 to 1 element, whereas a Flux is a stream of 0 to N elements.

This difference in the semantics of these two streams is very useful, as for example making a request to a http server expects to receive 0 or 1 response, it would be inappropriate to use a Flux in this case. On the opposite, computing the result of a mathematical function on an interval expects one result per number in the interval. In this other case, using a Flux is appropriate.

Mono and Flux are lazy

Being lazy is one of the properties of a reactive stream. It means that whatever the number of function call you make on the stream, they won’t be executed until you consume it.

For Mono and Flux, the method to use to consume them is .subscribe(…).

Nothing happens until you subscribe to a Flux (or Mono)

Most common ways to create a Flux

There are several ways to create a Flux. The below code snippet presents some of the most common:

Most common ways to create a Mono

Methods are different for a Mono, except the just(T... data) method. The below code snippet presents some of the most common:

As you may have noticed, being a 0–1 element stream, Mono is a perfect fit to create reactive streams from Futures, Suppliers or even Runnable as Java methods return at most 1 element.

Common methods to create Flux or Mono

Mono and Flux share 3 useful methods to create them: create, defer and error.

Error

Mono.error(Trowable t) and Flux.error(Throwable t) are very useful methods to handle errors while working with these reactive streams. We will discuss this subject in the second article.

Defer

Mono.defer(Supplier<? extends Mono<? extends T>>) is similar to Mono.fromCallable(...) but its supplier should return a Mono<T> whereas fromCallable expects a supplier returning a value of type T. Also, if you call a method that can throw an Exception, with defer you will have to catch it yourself, whereas with FromCallable, the Exception will automatically be wrapped into a Mono.error(...) . Let me show you with a little piece of code:

As you have seen, both way of creating the Mono gives the same result. Even though the defer method is more verbose, it makes your code easier to understand and allow fore more flexibility as you can remap the Exception into another kind of Mono, or create a new Exception from it.

Create

The create(Consumer<MonoSink<T>> callback) method is a lower level one than the other methods we saw, because it deals with the signals inside the Mono and Flux. Let’s look at an example for both Mono and Flux.

For the Mono, let’s do the same as above:

Here is an example for the Flux.create(…) method.

How to consume a Flux or a Mono ?

Now that we’ve seen how to create a Flux or a Mono, let’s take a look at how to use the values they hold. This is the operation of consuming the stream.

For this example, I think that a piece of code will be sufficient :).

In a Flux, data and operations on these data are treated sequentially. It means that for a Flux.range(1, 5) , the value 3 will never be displayed before 2 as the treatments on it are done before the ones on 3. We’ll discuss that in details in the third article.

Bad practices

One word about the methods you should not call on Flux and Mono.

Mono has a block() method that returns the value it holds. When calling it, the method block the current Thread. When doing it, your program is not reactive anymore. You should always find a way not to use it. Sometimes, it implies refactoring some of your methods.

Flux has methods that also starts with block. They are blockFirst() , blockFirst(Duration timeout) , blockLast() and blockLast(Duration timeout) . The rationale is the same.

All the methods on the Flux that returns non-asynchronous Java structures cause the same problem. These methods are toStream(...) and toIterable(...) .

Conclusion

In this first article, you have learned how to create a Flux or a Mono, and how to consume it in its simplest form.

In the next article, I’ll show you how to manipulate the data they hold. For example, how to add one to every value in a Flux or how to replace the value with its result to a function call.

Article #2 : https://medium.com/@cheron.antoine/reactor-java-2-how-to-manipulate-the-data-inside-mono-and-flux-b36ae383b499

Antoine Cheron

Written by

Ph.D. student @INRIA and FABERNOVEL Technologies

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade