Reactor: making data more fun in Java!

GJ Schouten
Team Rockstars IT
Published in
7 min readOct 26, 2022

In this article I’d like to give a short introduction into Reactor: a Java library that enables Reactive Programming techniques that make working with data easier and more fun!

What is reactive programming?

With Big Data platforms producing more and more data and AI/ML systems needing more and more input, applications increasingly have to deal with very large data streams. Processing these with traditional programming paradigms is possible, but rarely ideal.

Reactive programming is a programming style that allows you to specify what should happen with data, without specifying how it should be achieved. It also doesn’t require the actual data to be present (yet).

Most Java programs use an imperative programming style with blocking code to execute business logic. This works well when the data is readily present and the amount of data is not too large, but when the number of data increases or data might arrive in the future, this approach has severe limitations, because it blocks the executing thread until all data has arrived and has been processed.

One way to approach this problem is to make the program run asynchronously, with callbacks that respond to the arrival of data. Consider this example:

hotelService.getAvailableRooms("Hilton",
new Callback<List<HotelRoom>>() {

public void onSuccess(List<HotelRoom> hiltonRooms) {
if (hiltonRooms.isEmpty()) {
hotelService.getAvailableRooms("Marriot",
new Callback<List<HotelRoom>>() {

public void onSuccess(List<HotelRoom> marriotRooms) {
marriotRooms.stream()
.limit(50)
.forEach(room -> reservationService.book(room,
new Callback<Reservation>() {

public void onSuccess(Reservation reservation) {
notificationService.notifyGuest(reservation);
}

public void onError(Throwable t) {
customerService.notifyProblem(t);
}
}
));
}
public void onError(Trowable t) {
customerService.notifyProblem(t);
}
}
);
} else {
hiltonRooms.stream()
.limit(50)
.forEach(room -> reservationService.book(room,
new Callback<Reservation>() {

public void onSuccess(Reservation reservation) {
notificationService.notifyGuest(reservation);
}

public void onError(Throwable t) {
customerService.notifyProblem(t);
}
}
));
}
}

public void onError(Throwable t) {
customerService.notifyProblem(t);
}
}
);

This code gets HotelRooms, trying one hotel first, then another. It takes a maximum of 50 rooms and reserves them. If anything goes wrong, the customer service is notified.

The first problem with this code is that it’s complex. Nothing special is happening here, but it’s already difficult to understand. We could put the reservation logic in a private method, to avoid duplication, but it would not avoid the complexity.

The second problem is that it doesn’t solve the problem of very large amounts of data. We only take 50 elements from the List of HotelRooms, but the whole List still has to be loaded in memory first, because it is passed through the callback parameter. What if it, hypothetically, contains a million elements?

Instead of List, Reactor uses Flux to manage an ordered collection of elements. A Flux is an Object that can stream data asynchronously and exists even when the data itself isn’t there (yet). In addition, a Flux has many, many methods to manipulate the stream.

Now, let’s imagine that the hotelService and reservationService methods don’t accept Callbacks, but instead return a Reactor Flux of resp. HotelRooms and Reservations:

hotelService.getAvailableRooms("Hilton")
.switchIfEmpty(hotelService.getAvailableRooms("Marriot"))
.take(50)
.map(reservationService::book)
.subscribe(
notificationService::notifyGuest,
customerService::notifyProblem
);

That’s all it takes!

The amazing thing about Reactor is that nothing happens until you subscribe! This means that the fluxes that are returned from the hotelService don’t actually start pumping data until that data is requested. The downstream flow of data is started and can be tuned by an upstream flow of signals.

Each method in the chain actually creates a new Flux that subscribes to the previous Flux.

Threading

The entire chain executes asynchronously, only processing data when it arrives. You can fully control the threads on which the different parts of the chain run. For example, if you want to run the hotelService and reservationService operations on the same thread that you subscribed from, but the notificationService and the customerService on parallel threads:

hotelService.getAvailableRooms("Hilton")
.switchIfEmpty(hotelService.getAvailableRooms("Marriot"))
.take(50)
.map(reservationService::book)
.publishOn(Schedulers.parallel())
.subscribe(
notificationService::notifyGuest,
customerService::notifyProblem
);

The publishOn method affects everything that happens below it in the chain, in this case the handler methods that are specified in the subscribe method.

Perhaps you want to start the entire subscription on a bounded elastic thread pool:

hotelService.getAvailableRooms("Hilton")
.switchIfEmpty(hotelService.getAvailableRooms("Marriot"))
.take(50)
.map(reservationService::book)
.subscribeOn(Schedulers.boundedElastic())
.subscribe(
notificationService::notifyGuest,
customerService::notifyProblem
);

The subscribeOn method affects everything that happens from the top of the chain, where the data flow will start upon subscription.

Backpressure

Reactor is created with large flows of data in mind. If a Flux at the top produces more data than the services below it can handle, there are various mechanisms available to deal with that.

The take(50) method, for example, takes 50 items from upstream and then cancels the subscription. However, it does not tell upstream that it is only going to take 50. The Flux upstream could still produce a lot of data.

To propagate this signal upstream, we could set the optional limitRequest boolean to true:

hotelService.getAvailableRooms("Hilton")
.switchIfEmpty(hotelService.getAvailableRooms("Marriot"))
.take(50, true)
.map(reservationService::book)
.subscribe(
notificationService::notifyGuest,
customerService::notifyProblem
);

Now only 50 items are requested from the Flux instances upstream, preventing the production of superfluous data. There are a lot of methods available to tune the rate at which data will be processed: limitRate, takeWhile, takeUntil and many more. You can use them anywhere in the chain and they will signal the corresponding demand upstream.

Testing

Reactor comes with useful tools for testing instances of Flux. Suppose we have the following Flux:

Flux greetings = Flux.just("Jim", "Joe", "Tom")
.map(s -> "Hello " + s);

It can be unit tested like this:

StepVerifier.create(greetings) 
.expectNext("Hello Jim")
.expectNext("Hello Joe")
.expectNext("Hello Tom")
.expectComplete()
.verify();

StepVerifiers support many, many methods for verification, including functionality for using virtual time, if you want to test Flux instances that perform certain actions over a long period of time, like processing a continuous live stream of data.

Logging

Because a lot of logic takes place inside the methods of a Flux, stacktraces can be hard to read when something goes wrong. For that purpose, Reactor has sophisticated tooling to make sense of error messages.

When debugging, you can activate Hooks.onOperatorDebug(); which will instrument the calls to the Reactor methods and use them to create stacktraces with very useful meta information.

For production, Reactor comes with a Java Agent that instruments your code without paying the performance penalty associated with the debugging alternative. To use it, simply call ReactorDebugAgent.init();

And, since not all logging is about errors, you can simply call one of the many overloaded log() methods anywhere in the call chain of a Flux and Reactor will log a useful message whenever an element passes through that link of the chain:

hotelService.getAvailableRooms("Hilton")
.switchIfEmpty(hotelService.getAvailableRooms("Marriot"))
.log()
.take(50)
.map(reservationService::book)
.log("reservation", Level.INFO, SignalType.ON_NEXT)
.subscribe(
notificationService::notifyGuest,
customerService::notifyProblem
);

Reactor and Spring

Spring offers extensive support for Reactive Programming and it chose Reactor as the library to implement it. Not only is this a confirmation of the fact that Reactor is the go-to standard of Reactive Programming in Java, it is also very convenient!

When connecting to backend systems, like Cassandra or Kafka, Spring provides several integrations that generally follow the same pattern:

Operations, like CassandraOperations or KafkaOperations

Templates, like CassandraTemplate or KafkaTemplate

Repositories, like CassandraRepository or KafkaRepository

These classes provide several methods that return data from the underlying system, like CassandraTemplate.select(query), which returns a List of objects of the appropriate type.

Spring also provides Reactive versions of each of these! So, for example, for Cassandra, Spring provides a ReactiveCassandraOperations, ReactiveCassandraTemplate and ReactiveCassandraRepository. Instead of a List<T>, they return a Flux<T>!

On the frontend, Spring Webflux uses Reactor to implement Reactive endpoints. This makes it easy to, for example, process a multipart request reactively:

@PostMapping(path = "/handle")
public String handle(@RequestBody Flux<Part> parts) {
// ...
}

The output itself can also be reactive, creating a continuous data stream over HTTP:

@GetMapping(path = "/data", produces = "text/event-stream")
public Flux<String> getData() {
return Flux.range(1, 20)
.map(n -> "Line " + 1)
.delayElements(Duration.ofSeconds(1))
}

Comparison with java.util.Stream

You may wonder: doesn’t java.util.Stream already have this declarative, method chaining style of programming that can process data in parallel?

It does. However, there are several reasons why using Reactor is superior when dealing with data flows:

  • Reactor has a subscriber mechanism. Data will only start to flow once you subscribe.
  • Reactor is asynchronous by default.
  • Stream does not handle backpressure, you have to be ready to receive the entire stream of data, even if you only process a part of it.

Also, Stream has a rather basic API. In addition to the things that we’ve already seen, Reactor has a lot more to offer:

  • Do you need metrics on the amount of data that goes through a Flux? Reactor integrates with Micrometer.
  • Do you want to only start the flow when a Flux has a certain number of subscribers? Reactor offers ConnectableFlux.
  • Do you need grouping of a Flux’s values, for example, grouping a Flux of numbers into a Flux of odd and a Flux of even numbers? Reactor offers Flux.groupBy.
  • Do you need windowing of a Flux’s values, for example, windowing a Flux of numbers into windows of 1 hour each? Reactor offers many Flux.window methods.
  • Do you need buffering of a Flux’s values, for example, processing a Flux of numbers in buffers of 1000? Reactor offers many Flux.buffer methods.

And the list goes on. There’s too much to discuss in one article, as Reactor has a method for every conceivable use case and if not, it provides plenty of hooks to implement your own.

Conclusion

I hope I’ve been able to give you an idea about what Reactor is and what it can do for you.

If you have an application landscape that includes Big Data systems like NoSQL databases or Streaming platforms and you need to process that data, for example to create an AI/ML model, Reactor can be the spider in the web that connects all these components together in a scalable, asynchronous, parallel and flexible manner.

At the same time, Reactor makes the work more fun for developers by offering a Reactive programming model and taking away the boilerplate code that usually comes with these use cases.

If you want to learn more about Reactor, I recommend the Reference Manual, in particular the section that lists which method to use for which use case.

The main website offers plenty of learning material and this interactive web based tutorial is an excellent way to get started.

Next time…

In the next article, I will explain how Reactor integrates with Kotlin and takes advantage of a few unique language constructs that Kotlin offers.

Thanks a lot for reading and stay tuned!

--

--

GJ Schouten
Team Rockstars IT

Software Architect / Engineer and creator of The Lukashian Calendar. The most important things in software? Simplicity, consistency and naming :-)