Reactor: making data more fun in Java!
In this article I’d like to give a short introduction into Reactor: a Java library that enables Reactive Programming techniques that make working with data easier and more fun!
What is reactive programming?
With Big Data platforms producing more and more data and AI/ML systems needing more and more input, applications increasingly have to deal with very large data streams. Processing these with traditional programming paradigms is possible, but rarely ideal.
Reactive programming is a programming style that allows you to specify what should happen with data, without specifying how it should be achieved. It also doesn’t require the actual data to be present (yet).
Most Java programs use an imperative programming style with blocking code to execute business logic. This works well when the data is readily present and the amount of data is not too large, but when the number of data increases or data might arrive in the future, this approach has severe limitations, because it blocks the executing thread until all data has arrived and has been processed.
One way to approach this problem is to make the program run asynchronously, with callbacks that respond to the arrival of data. Consider this example:
hotelService.getAvailableRooms("Hilton",
new Callback<List<HotelRoom>>() {
public void onSuccess(List<HotelRoom> hiltonRooms) {
if (hiltonRooms.isEmpty()) {
hotelService.getAvailableRooms("Marriot",
new Callback<List<HotelRoom>>() {
public void onSuccess(List<HotelRoom> marriotRooms) {
marriotRooms.stream()
.limit(50)
.forEach(room -> reservationService.book(room,
new Callback<Reservation>() {
public void onSuccess(Reservation reservation) {
notificationService.notifyGuest(reservation);
}
public void onError(Throwable t) {
customerService.notifyProblem(t);
}
}
));
}
public void onError(Trowable t) {
customerService.notifyProblem(t);
}
}
);
} else {
hiltonRooms.stream()
.limit(50)
.forEach(room -> reservationService.book(room,
new Callback<Reservation>() {
public void onSuccess(Reservation reservation) {
notificationService.notifyGuest(reservation);
}
public void onError(Throwable t) {
customerService.notifyProblem(t);
}
}
));
}
}
public void onError(Throwable t) {
customerService.notifyProblem(t);
}
}
);
This code gets HotelRooms
, trying one hotel first, then another. It takes a maximum of 50 rooms and reserves them. If anything goes wrong, the customer service is notified.
The first problem with this code is that it’s complex. Nothing special is happening here, but it’s already difficult to understand. We could put the reservation logic in a private method, to avoid duplication, but it would not avoid the complexity.
The second problem is that it doesn’t solve the problem of very large amounts of data. We only take 50 elements from the List
of HotelRooms
, but the whole List
still has to be loaded in memory first, because it is passed through the callback parameter. What if it, hypothetically, contains a million elements?
Instead of List
, Reactor uses Flux
to manage an ordered collection of elements. A Flux
is an Object
that can stream data asynchronously and exists even when the data itself isn’t there (yet). In addition, a Flux
has many, many methods to manipulate the stream.
Now, let’s imagine that the hotelService
and reservationService
methods don’t accept Callbacks
, but instead return a Reactor Flux
of resp. HotelRooms
and Reservations
:
hotelService.getAvailableRooms("Hilton")
.switchIfEmpty(hotelService.getAvailableRooms("Marriot"))
.take(50)
.map(reservationService::book)
.subscribe(
notificationService::notifyGuest,
customerService::notifyProblem
);
That’s all it takes!
The amazing thing about Reactor is that nothing happens until you subscribe! This means that the fluxes that are returned from the hotelService
don’t actually start pumping data until that data is requested. The downstream flow of data is started and can be tuned by an upstream flow of signals.
Each method in the chain actually creates a new Flux
that subscribes to the previous Flux
.
Threading
The entire chain executes asynchronously, only processing data when it arrives. You can fully control the threads on which the different parts of the chain run. For example, if you want to run the hotelService
and reservationService
operations on the same thread that you subscribed from, but the notificationService
and the customerService
on parallel threads:
hotelService.getAvailableRooms("Hilton")
.switchIfEmpty(hotelService.getAvailableRooms("Marriot"))
.take(50)
.map(reservationService::book)
.publishOn(Schedulers.parallel())
.subscribe(
notificationService::notifyGuest,
customerService::notifyProblem
);
The publishOn
method affects everything that happens below it in the chain, in this case the handler methods that are specified in the subscribe
method.
Perhaps you want to start the entire subscription on a bounded elastic thread pool:
hotelService.getAvailableRooms("Hilton")
.switchIfEmpty(hotelService.getAvailableRooms("Marriot"))
.take(50)
.map(reservationService::book)
.subscribeOn(Schedulers.boundedElastic())
.subscribe(
notificationService::notifyGuest,
customerService::notifyProblem
);
The subscribeOn
method affects everything that happens from the top of the chain, where the data flow will start upon subscription.
Backpressure
Reactor is created with large flows of data in mind. If a Flux
at the top produces more data than the services below it can handle, there are various mechanisms available to deal with that.
The take(50)
method, for example, takes 50 items from upstream and then cancels the subscription. However, it does not tell upstream that it is only going to take 50. The Flux
upstream could still produce a lot of data.
To propagate this signal upstream, we could set the optional limitRequest
boolean to true:
hotelService.getAvailableRooms("Hilton")
.switchIfEmpty(hotelService.getAvailableRooms("Marriot"))
.take(50, true)
.map(reservationService::book)
.subscribe(
notificationService::notifyGuest,
customerService::notifyProblem
);
Now only 50 items are requested from the Flux
instances upstream, preventing the production of superfluous data. There are a lot of methods available to tune the rate at which data will be processed: limitRate
, takeWhile
, takeUntil
and many more. You can use them anywhere in the chain and they will signal the corresponding demand upstream.
Testing
Reactor comes with useful tools for testing instances of Flux
. Suppose we have the following Flux
:
Flux greetings = Flux.just("Jim", "Joe", "Tom")
.map(s -> "Hello " + s);
It can be unit tested like this:
StepVerifier.create(greetings)
.expectNext("Hello Jim")
.expectNext("Hello Joe")
.expectNext("Hello Tom")
.expectComplete()
.verify();
StepVerifiers
support many, many methods for verification, including functionality for using virtual time, if you want to test Flux
instances that perform certain actions over a long period of time, like processing a continuous live stream of data.
Logging
Because a lot of logic takes place inside the methods of a Flux
, stacktraces can be hard to read when something goes wrong. For that purpose, Reactor has sophisticated tooling to make sense of error messages.
When debugging, you can activate Hooks.onOperatorDebug();
which will instrument the calls to the Reactor methods and use them to create stacktraces with very useful meta information.
For production, Reactor comes with a Java Agent that instruments your code without paying the performance penalty associated with the debugging alternative. To use it, simply call ReactorDebugAgent.init();
And, since not all logging is about errors, you can simply call one of the many overloaded log()
methods anywhere in the call chain of a Flux
and Reactor will log a useful message whenever an element passes through that link of the chain:
hotelService.getAvailableRooms("Hilton")
.switchIfEmpty(hotelService.getAvailableRooms("Marriot"))
.log()
.take(50)
.map(reservationService::book)
.log("reservation", Level.INFO, SignalType.ON_NEXT)
.subscribe(
notificationService::notifyGuest,
customerService::notifyProblem
);
Reactor and Spring
Spring offers extensive support for Reactive Programming and it chose Reactor as the library to implement it. Not only is this a confirmation of the fact that Reactor is the go-to standard of Reactive Programming in Java, it is also very convenient!
When connecting to backend systems, like Cassandra or Kafka, Spring provides several integrations that generally follow the same pattern:
Operations, like CassandraOperations
or KafkaOperations
Templates, like CassandraTemplate
or KafkaTemplate
Repositories, like CassandraRepository
or KafkaRepository
These classes provide several methods that return data from the underlying system, like CassandraTemplate.select(query)
, which returns a List
of objects of the appropriate type.
Spring also provides Reactive versions of each of these! So, for example, for Cassandra, Spring provides a ReactiveCassandraOperations
, ReactiveCassandraTemplate
and ReactiveCassandraRepository
. Instead of a List<T>
, they return a Flux<T>
!
On the frontend, Spring Webflux uses Reactor to implement Reactive endpoints. This makes it easy to, for example, process a multipart request reactively:
@PostMapping(path = "/handle")
public String handle(@RequestBody Flux<Part> parts) {
// ...
}
The output itself can also be reactive, creating a continuous data stream over HTTP:
@GetMapping(path = "/data", produces = "text/event-stream")
public Flux<String> getData() {
return Flux.range(1, 20)
.map(n -> "Line " + 1)
.delayElements(Duration.ofSeconds(1))
}
Comparison with java.util.Stream
You may wonder: doesn’t java.util.Stream
already have this declarative, method chaining style of programming that can process data in parallel?
It does. However, there are several reasons why using Reactor is superior when dealing with data flows:
- Reactor has a subscriber mechanism. Data will only start to flow once you subscribe.
- Reactor is asynchronous by default.
Stream
does not handle backpressure, you have to be ready to receive the entire stream of data, even if you only process a part of it.
Also, Stream
has a rather basic API. In addition to the things that we’ve already seen, Reactor has a lot more to offer:
- Do you need metrics on the amount of data that goes through a
Flux
? Reactor integrates with Micrometer. - Do you want to only start the flow when a
Flux
has a certain number of subscribers? Reactor offersConnectableFlux
. - Do you need grouping of a
Flux
’s values, for example, grouping aFlux
of numbers into aFlux
of odd and aFlux
of even numbers? Reactor offersFlux.groupBy.
- Do you need windowing of a
Flux
’s values, for example, windowing aFlux
of numbers into windows of 1 hour each? Reactor offers manyFlux.window
methods. - Do you need buffering of a
Flux
’s values, for example, processing aFlux
of numbers in buffers of 1000? Reactor offers manyFlux.buffer
methods.
And the list goes on. There’s too much to discuss in one article, as Reactor has a method for every conceivable use case and if not, it provides plenty of hooks to implement your own.
Conclusion
I hope I’ve been able to give you an idea about what Reactor is and what it can do for you.
If you have an application landscape that includes Big Data systems like NoSQL databases or Streaming platforms and you need to process that data, for example to create an AI/ML model, Reactor can be the spider in the web that connects all these components together in a scalable, asynchronous, parallel and flexible manner.
At the same time, Reactor makes the work more fun for developers by offering a Reactive programming model and taking away the boilerplate code that usually comes with these use cases.
If you want to learn more about Reactor, I recommend the Reference Manual, in particular the section that lists which method to use for which use case.
The main website offers plenty of learning material and this interactive web based tutorial is an excellent way to get started.
Next time…
In the next article, I will explain how Reactor integrates with Kotlin and takes advantage of a few unique language constructs that Kotlin offers.
Thanks a lot for reading and stay tuned!