Reactive programming and Project Reactor
What is Reactive Programming ?
Reactive programming is programming with asynchronous data streams . Events, messages, calls and even failures are going to be sent by a data stream. With reactive programming you observe these streams and react when a value is emitted.
Let’s have a look at a very simple example:
int val1 = 5;
int val2 = 10;
int sum = val1 + val2;
System.out.println(sum);
val1 = 15;
System.out.println(sum);
Now let’s see what values will be printed using both, imperative style and reactive style.
When using imperative style the value of sum remains the same, so it prints
15
15
When using a reactive style, our sum variable will react to the change and it will contain the new value. In this case the program prints:
15
25
Why do we need Reactive Programming ?
This is a question that you should ask yourself every time you add a new technology or use a new programming paradigm. Do I really need my code to be asynchronous ? What are the benefits ? Well, it depends. Sometimes, writing asynchronous code may be difficult. If you don’t need it just don’t do it. Keep it simple. I can’t tell you to use it or not, but I’ll show you what are the benefits of using Reactive Programming and you can decide for yourself.
When we try to build microservice architecture we try to involve different services that deliver a business solution. This is well known as distributed architecture. If a distributed architecture is designed incorrectly, performance issues surface very quickly. More than that, more and more people use various applications for their daily activities, resulting in an increase in application users. Some of the application may be critical, like finding directions to the nearest hospital. We have to build scalable applications, so that they can grow their capacity to keep up with ever-increasing usage without radical redesign, highly reliable , so they don’t fail, even if a part of the system crashes and it must maintain its availability 24/7.
Having these in mind, we have to ask ourselves: what can we do better ? How can I design my system in such a way it supports more traffic and it remains available to the users ? Of course, we can scale it up, but what’s the biggest problem that prevents our system to support higher traffic ? Take a couple of seconds and think about it. Have you made a guess? It’s the input and output. Every time we call an external service, or we fetch data from a database or we write data in a file, our thread will wait for a response. If it takes 2 seconds to retrieve some data from the database , for 2 seconds our thread will do nothing. It this a problem ? Of course it is. In this 2 seconds our thread can serve another request or it can just solve another task. Everything is better than waiting for a response and doing nothing. Here is where Reactive Programming saves our day. Instead of sending a request to the database and waiting for the data, we let the thread do something else and we just react when the data is retrieved.
Let’s start with an example from real life. Let’s say we are in a restaurant. A blocking restaurant. How it’s gonna be our experience ?
We ask for a menu and the waiter comes to us.
And while we are deciding, the waiter waits
Finally we are ready. The waiter comes to us and pick up the order.
He brings our order and while we are eating our food he waits again.
And waits
Finally, we finish our food and we ask to pay.
And while we are paying he waits.
Finally, we pay , and our waiter is free to go to another table.
Is this the most efficient way of doing things in a restaurant ? Of course it’s not. Are we happy clients ? We may be, but for sure other clients waiting for our waiter are not so happy .
What can we do to keep all of our clients happy ? We have two options.
Let’s say in the restaurant are 5 tables. The first option is to have a waiter for each table.
By far, is not the ideal option. But what if, we can have only one waiter who is doing other things instead of waiting ?
While we are looking at the menu and deciding what to order he can bring food to another table.
The same happens when, for example, we retrieve data from a database.
Can we have 1000 threads for 1000 request ? Can we schedule 1000 threads in the same time ? Of course not. Unless we have 1000 cores, it’s not going to be what we expect, because running in parallel is an illusion. We are moving work from one core to another hoping to be able to simulate the illusion of 1000 things happening in the same time.
What can we improve ? Similar with the story from the restaurant, instead of waiting for a response from the database, we can just let the thread to do another task or to handle another request.
What is Project Reactor ?
Project Reactor is a fully non-blocking foundation with back-pressure support included. It’s the foundation of the reactive stack in the Spring ecosystem and is featured in projects such as Spring WebFlux, Spring Data, and Spring Cloud Gateway. — https://spring.io/reactive
It’s basically a library for creating reactive applications. We know that in a reactive application , all data, including errors are pushed as events. So, what’s one of the main components when thinking about writing reactive code ? Yes, you’re right. The publishers. Besides the publishers we have, of course, other important components like subscribers or the back pressure mechanism .
Don’t worry, I will explain what back pressure is a little bit later.
But let’s start with the publishers .
Publishers
Project Reactor provides two main publisher implementations:
reactor.core.publisher.Mono<T> which is a reactive stream publisher representing 0 or 1 element
and
reactor.core.publisher.Flux<T> which is a reactive stream publisher representing an asynchronous sequence of 0 to N elements
We remember from the beginning that Reactive Programming is programming with asynchronous data streams. Streams are the central concept when writing reactive code and everything can be used as a stream.
A publisher can emit three types of events : values, errors and completed.
Subscriber
In order to consume the data published by a publisher we have to subscribe to it. Basically, it will tell the publisher “Hey, I am here, give me some data”.
How can we create a subscriber ? It’s very easy. We can do it by implementing the org.reactivestreams.Subscriber interface and overriding its methods:
public void onSubscribe(Subscription subscription)
public void onNext(String s)
public void onError(Throwable throwable)
public void onComplete()
Let’s take each method separately.
public void onSubscribe(Subscription subscription)
it’s called when our subscriber connects to the publisher.
public void onNext(String s)
it’s called every time we receive a value from our publisher
public void onError(Throwable throwable)
it’s called when the publisher emits an error
public void onComplete()
it’s called when the publisher pushed all the data
Let’s suppose we are at a wine tasting event.
The publisher will give to the subscriber glasses of wine, one by one, and the subscriber will taste each of them.
At the beginning, the subscriber subscribes to the publisher using the “onSubscribe” method by saying: “Hey, I am ready to start”.
From that point, the publisher starts giving glasses of wine to the subscriber and the subscriber tastes the wine using the “onNext” method.
When the publisher finishes all the glasses, it informs the subscriber by sending a signal which is captured by the subscriber using the “onComplete” method.
The back pressure mechanism
What’s one of the biggest problem that could appear when pushing and consuming data ? Let’s think about it. Our publisher constantly pushes data while the subscriber consumes it. It can pushes millions of millions of events while our subscriber tries to save them in a database. What can go wrong ? Yes, that’s it. Our consumer may not be able to keep up with the speed of data published by the publisher and it may be overwhelm.
In our example, the wine taster will not be able to taste the glass of wine before receiving another one.
In order to prevent this, a back pressure mechanism is used. What is exactly this mechanism ? It means that the publisher will not push any date before the consumer requests it.
The root of the back pressure is the Subscription object which has two methods:
request(long n) -> used by the subscriber to request more data
cancel() to cancel the consuming of the events
Let’s put them all together
We have learned about publishers, subscribers and about back pressure and subscriptions. How exactly the flow looks like ?
The subscriber will request data using a subscription. The publisher will push the data and once all the data is published the on complete signal is sent to the subscriber.
Mono
Mono is the first publisher provided by the Project Reactor library. It can publish 0 or 1 element. When subscribing to it, we have the following possible scenarios:
- emit 1 element, followed by the completed event.
- emit no elements, followed by the completed event.
- emit an error.
How do we create a Mono publisher ? Luckly, we have a few factory methods for creating a Mono.
Mono<String> publisher = Mono.just("My first publisher");
This is it. We have our first publisher, which emits the value “My first publisher”. In the same way, we can use another method:
Mono<String> publisher = Mono.justOrEmpy("My first publisher");
Or we can just create a publisher which emits no value but only the on complete event.
Mono<String> publisher = Mono.empty();
Besides these simple factory methods, we can create a Mono using functional interfaces, for example, using the Callable interface:
Mono<String> publisher = Mono.fromCallable(() -> "My first publisher");
or the Supplier interface:
Mono<String> publisher = Mono.fromSupplier(() -> "My first publisher")
Despite the method used to create the publisher, it does the same thing. It publishes a string value.
How can we consume the value ? We have to subscribe to it. Let’s create a subscriber by implementing the org.reactivestreams.Subscriber interface:
class MySubscriber implements Subscriber<String> { @Override
public void onSubscribe(Subscription subscription) {
System.out.println("Subscribed");
}
@Override
public void onNext(String s) {
System.out.println("Element received" + s);
}
@Override
public void onError(Throwable t) {
System.out.println("Error received " + t.getMessage());
}
@Override
public void onComplete() {
System.out.println("On complete");
}
}
How do we subscribe to the publisher ? By calling the subscribe method on it.
Mono<String> publisher = Mono.just("My first publisher");
publisher.subscribe(new MySubscriber());
If you run the code, what do you think it prints ? Don’t try to answer, just take a couple of seconds and think about it very well.
You may say: “Well, since the subscriber subscribes to the data, consumes one element and then the publisher sends a complete event it prints: “
- Subscribed
- Element received: My first publisher
- On complete
It is a very logical way of thinking, but it prints only:
It looks like we are not receiving any event. No value, no error, not even the onComplete event. Do you remember when we discussed about back pressure ? We said that, in order to avoid overwhelming, our subscriber must request the data. How can we do that ? Using the Subscription object. If you look closer at the onSubscribe method implementation, you can see a Subscription object passed as argument. So, we have to modify the method to not only print the “Subscribed” string, but also to request a value.
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("Subscribed");
subscription.request(1);
}
If we run again the code we will see printed:
And it makes sense. We subscribe to the publisher, request one element, the publisher sends the value and then it pushes the onComplete event.
Can we modify the data pushed by a Mono publisher ? Of course we can. We have various methods that can help us operate on the data.
We can modify the data using the map method.
Mono<String> publisher = Mono.just("My first publisher");
publisher
.map(value -> value + " modified")
.subscribe(new MySubscriber());
The output is:
We can filter the data
Mono<String> publisher = Mono.just("My first publisher");
publisher
.filter(value -> value.equals("This is not my first publisher"))
.subscribe(new MySubscriber());
In this case the publisher will emit only the onComplete signal because the data is filtered.
Or we can emit an error signal when our mono has no values to emit.
Mono<String> publisher = Mono.just("My first publisher");
publisher
.filter(value -> value.equals("This is not my first publisher"))
.switchIfEmpty(Mono.error(new RuntimeException("Something went wrong")))
.subscribe(new MySubscriber());
In this case , the onComplete method is not called anymore, but only the onError method.
Flux
Flux is the second publisher provided by the Project Reactor library and it can publish events from 0 to N.
Similar with Mono, we have different factory methods to create a Flux.
Flux<String> publisher = Flux.just("a","b","c");
Because it can emit more data we can use a collection to create the publisher.
Collection<String> data = Arrays.asList("a","b","c");
Flux<String> publisher = Flux.fromStream(data.stream());
How do we subscribe to a Flux publisher ? In the same way as we do with the Mono publisher.
publisher.subscribe(new MySubscriber());
What does it print when we run the code ? Does it print all the elements ? Yes, you’re right. It prints only the first element.
This happens because in our subscriber we request only one element.
We can change it to request three elements, but this means to assume that we know the number of values published. In a real life scenario is not the case. So what can we do in order to consume all the data ? We have to update our subscriber to request new data every time it consumes a value.
These are the changes we have to made in our class.
private Subscription subscription;
@Override
public void onSubscribe(Subscription subscription) {
System.out.println("Subscribed");
this.subscription = subscription;
this.subscription.request(1);
}
@Override
public void onNext(String s) {
System.out.println("Element received " + s);
subscription.request(1);
}
Every time we consume a value, in the onNext method we request a new one.
And this time the output is what we expect to be, all three strings followed by the on complete signal.
Do we have to create a class in order to subscribe to a publisher ? No, we don’t. The subscribe() method comes with a various implementations providing to us a way to subscribe to the publisher and manipulate the data by only passing lambdas.
We can achieve the same result with the following code
publisher.subscribe(
value -> System.out.println("Element received " + value),
error -> System.out.println("Error received " + error.getMessage()),
() -> System.out.println("On complete")
);
Conclusion
Reactive programming is not an easy topic, but it can help you to implement an application that can support higher traffic. If you already think about adding the Reactive programming paradigm in your application just remember to ask yourself: do I really need it ?