Reactive Programming with Reactor

Aftab Shaikh
Nerd For Tech
Published in
6 min readMar 12, 2021
Arc Reactor

Before we talk about what/why Reactor, let us first have a quick recap on Reactive Programming and Reactive Streams.

Reactive programming is a way of writing asynchronous, non-blocking code. It means that whenever there is a time consuming task like IO operation/api call the execution is switched to another active task and it comes back to the current task when the async processing has finished.

A simple way to think
Reactive: Reacts as soon as the data is available or the async execution is finished.

Reactive Streams

Dictionary says: A stream is a steady flow of something

In terms of computing, a stream consists of a producer of data and the subscriber of data. And the data starts flowing from producer to subscriber when the subscriber subscribes to the producer. In between there can be multiple stages modifying the data as depicted in below diagram

General Streams

This was about the general streams. Now what does make a stream really reactive.

  1. Asynchronicity: Producer produces the data asynchronously and the data flows asynchronously through processing stages.
  2. Non-blocking backpressure: It is a mechanism where the consumer can request the producer to slow down, whenever the consumer is slow in processing in data
Reactive Streams

You can visualise reactive streams as the conveyor belt. Whenever raw material is available at source it is poured on the conveyor belt. At certain points on the conveyor belt the material is processed/transformed and the finished product is then pushed to at other end (subscriber)

Conveyor Belt

This was all the general gyaan or concepts which stays same no matter which reactive framework.

Coming back to Reactor. It is a fully non-blocking, reactive programming framework with non-blocking backpressure.
One of the most powerful thing which it provides is vast vocabulary of operators which can be composed with each other providing readability to the code.

Let’s break down this framework and look at it in following steps:

  1. Publishers/Producers in reactor
  2. Operators in reactor

Publishers in Reactor

Mono and Flux are the two publishers provided by Reactor. Mono is used to publish a single entity and Flux is used to publish a sequence of entities this is the only major difference between a Mono and Flux. To create a Mono or Flux you can use the static method just()

Mono.just(1)
Flux.just(1,2,3,4,5,6,7,8)

Nothing happens until you subscribe. We have created the publishers but the data won’t start pumping in until anyone subscribes to it

log() method logs all the interactions happening with a publisher. It is very helpful to understand the various signals going from publishers. The above program will output something like this:

From above output we can conclude the following: Whenever a consumer subscribes to publisher, on successful subscription a onSubscribe signal is sent to subscriber. After that subscriber requests for the data. Then the publisher will send either an onNext (for data) signal or an onError (for error) signal. When the publisher does not have any data left an onComplete signal will be sent to the subscriber.

Operators in Reactor

As I said earlier, one of the most powerful weapon of reactor is it’s vast vocabulary of operators. Let’s understand what are few of the use cases where we can use these operators.

map()

The map() operator is used to transform the data flowing through the stream. Let’s see a simple example

In the above example, as soon as the Mono is subscribed data flows into map operator where it is squared and returned.

When any operator returns some data it is wrapped into a new Publisher (Mono in this case) and then returned.

You can chain any number of operators in a chain. Whatever is return from the operator flows to the next operators

flatmap()

Similar to the map() operator flatMap() can also be used to transform the data. But in addition, this transformation can be asynchronous.

In above example when we get the customer data it flows inside in flatMap() operator. Then we perform the payment for the customer asynchronously. So PaymentService().pay() returns a Mono<Payment>.

The important role which flatMap() plays here is it unwraps the Mono<Payment> returned by PaymentService().pay() and merges it with the upstream so that the final return of chain is Mono<Payment>.

In case if we had used map() instead of flatMap() the response would have been Mono<Mono<Payment>>. And as nobody has subscribed to the inner Mono, the payment will never happen. This is the most important difference between flatMap() and map()

zip()

zip() can be used to execute two or more publishers concurrently.

Let’s say we have a use case where we have to fetch the customer’s personal details and bank account details from two different APIs. Here, both the tasks are independent of each other, so can be executed concurrently.

Here we go:

zip() returns a tuple of responses from the publishers. In above case it will be Tuple<PersonalDetails, AccountDetails>. Then we can merge them together and return a single Customer object

Let’s say for some reason we don’t get the personal details of the customer, in that case zip won’t execute the following steps. It only executes the following steps when all of the publishers respond with data. This will cause the subscribe to receive empty response. Let’s see how do we handle this situation using our next operator.

switchIfEmpty()

As name suggests, switchIfEmpty() switches to an alternate publisher in case it receives an empty response from upstream.

In above example, we switch to customer details from cache in case we receive an empty response from customer service.

Coming back to how we handle the scenario which we discussed in zip operator:

In above example, if we don’t get personal/account details we switch to the details from cache and the end user will receive the same.

onErrorMap()

onErrorMap() is one of the error handling operator. Whenever an error occurs in a reactive chain, the control is simply switched to the first error handling operator

onErrorMap() can be used to transform the occurred Exception into Domain Specific Exception or any exception which the application understands.

In the above example we are throwing a RequestTimeoutException with some understandable message and error code whenever there is a Timeout Exception

It is a good idea to identify the possible set of Exceptions which can happen in your system. It can help you in following ways:

1. When it happens you can take a particular measure.

2. It is more informative (In logs/As a response)

onErrorResume()

onErrorResume() allows you to fallback to an alternate process/publisher in case something goes wrong.

In above example, an exception will be thrown if there is a timeout while fetching favorites. In that case, we fallback to cache service for favorites.

Above given operators are commonly used ones, but are a very small part of what Reactor has.

Keep on exploring, as it is fun to do…

References:
1. Official docs of project reactor — https://projectreactor.io/docs/core/release/reference/

--

--

Nerd For Tech
Nerd For Tech

Published in Nerd For Tech

NFT is an Educational Media House. Our mission is to bring the invaluable knowledge and experiences of experts from all over the world to the novice. To know more about us, visit https://www.nerdfortech.org/.

Aftab Shaikh
Aftab Shaikh

Written by Aftab Shaikh

Application Developer @Thoughtworks

No responses yet