Reactor: even more fun in Kotlin!
In the previous article, I gave an introduction into Reactor: a Reactive Programming Java library that makes working with data easier and more fun. In this article, I will explore how Reactor behaves in that other, very popular, JVM language: Kotlin. I will also compare Reactor to Kotlin Flows & Coroutines.
Extension methods
One of the most powerful features of Kotlin are extension methods (also known as lambda-with-receiver): the ability to add methods to external classes in your own code. For example, this is a method that prepends “Hello” to a String
:
fun String.sayHello() {
return "Hello " + this
}
Reactor uses this to add various extension methods of its own:
//Java
Flux.fromIterable(list)
//Kotlin
list.toFlux()
//Java
Flux.error(new RuntimeException())
//Kotlin
RuntimeException().toFlux()
//Java
StepVerifier.create(flux).verifyComplete()
//Kotlin
flux.test().verifyComplete()
There are quite a few more handy extension methods available. For a complete overview, check the API Docs.
Scope functions
Scope functions are extension methods that are defined by Kotlin itself. One example is the let
method, that works on any Object
. It executes a function block and returns its result. This is particularly useful in Builder style patterns, where you invoke a chain of methods that all return an Object
of the same type. Of course a Flux
isn’t technically a Builder, because the chain of methods creates a chain of fluxes that subscribe to one another, rather than operating on a single Flux
, but the programming style is very similar.
Let’s have a look at the hotel example from the previous article. There, we have a hotelService
and a reservationService
that return a Reactor Flux
of resp. HotelRooms
and Reservations
. A Flux
is an Object
that can stream data asynchronously and exists even when the data itself isn’t there (yet). In addition, a Flux
has many, many methods to manipulate the stream.
hotelService.getAvailableRooms("Hilton")
.switchIfEmpty(hotelService.getAvailableRooms("Marriot"))
.take(50)
.map(reservationService::book)
.subscribe(
notificationService::notifyGuest,
customerService::notifyProblem
);
Now, let’s say that we only want to call the switchIfEmpty
method if the Marriot hotels are enabled. In Java, you would have to do this:
Flux<HotelRoom> hotelRoomFlux = hotelService.getAvailableRooms("Hilton");
if (marriotEnabled) {
hotelRoomFlux = hotelRoomFlux.switchIfEmpty(hotelService.getAvailableRooms("Marriot"));
}
hotelRoomFlux
.take(50)
.map(reservationService::book)
.subscribe(
notificationService::notifyGuest,
customerService::notifyProblem
);
Using the Kotlin let
scope function, we can simply do:
hotelService.getAvailableRooms("Hilton")
.let { hiltonFlux -> if (marriotEnabled) hiltonFlux.switchIfEmpty(hotelService.getAvailableRooms("Marriot")) else hiltonFlux }
.take(50)
.map(reservationService::book)
.subscribe(
notificationService::notifyGuest,
customerService::notifyProblem
);
Competition: Reactor vs Flows & Coroutines!
In addition to providing several improvements to the Reactor experience, Kotlin also comes with its own solution for Reactive Programming: Kotlin Flows & Coroutines. By adding the suspend
keyword to a method, you can turn that method into an async method, that will execute in a non-blocking, async fashion when data is actually available. Here is an example, using the Spring Framework, which offers support for both Reactor as well as Kotlin Flows & Coroutines.
In Kotlin Flows & Coroutines, you would write:
//In the Web Controller:
@GetMapping("/{id}")
suspend fun getById(@PathVariable id: Long): Hotel { //Will not block while waiting for the data
return hotelService.getById(id);
}
@GetMapping("/search/{name}")
suspend fun searchByName(@PathVariable name: String): Flow<Hotel> { //Will return the Flow immediately, without waiting for the data
return hotelService.searchByName(name);
}
//In the HotelService:
suspend fun getById(id: Long): Hotel { //Will not block while waiting for the data
return coroutinesHotelRepository.getById(id);
}
suspend fun searchByName(name: String): Flow<Hotel> { //Will return the Flow immediately, without waiting for the data
return coroutinesHotelRepository.searchByName(name);
}
This is similar to the following in Reactor:
//In the Web Controller:
@GetMapping("/{id}")
fun getById(@PathVariable id: Long): Mono<Hotel> { //Will return the Mono immediately, without waiting for the data
return hotelService.getById(id);
}
@GetMapping("/search/{name}")
fun searchByName(@PathVariable name: String): Flux<Hotel> {
return hotelService.searchByName(name); //Will return the Flux immediately, without waiting for the data
}
//In the HotelService:
fun getById(id: Long): Mono<Hotel> { //Will return the Mono immediately, without waiting for the data
return reactiveHotelRepository.getById(id);
}
fun searchByName(name: String): Flux<Hotel> {
return reactiveHotelRepository.searchByName(name); //Will return the Flux immediately, without waiting for the data
}
A much more comprehensive comparison between the two can be found in this Medium article.
Which is better
As always, this depends largely on your specific needs and preferences. Speaking for myself, I prefer Reactor for the following reasons:
- In Kotlin Flows & Coroutines, the code requires a mix between language keywords (eg.
suspend
) and API classes and methods (eg.Flow
). This is not very elegant and slightly confusing. In Reactor, the use of the language itself (Java, Kotlin or any other JVM language) doesn’t change. Instead, the reactive patterns are fully contained in the Reactor API (eg.Flux
). - Related to this, Reactor makes reactive programming more explicit. It’s obvious from the code that you’re working with a reactive API. Code written with Kotlin Flows & Coroutines looks very similar to blocking code. This may seem nice, but making explicit what is actually happening is very important for readability and maintenance. If programmers see code that differs from traditional, blocking code only because of the
suspend
keywords, they may not be aware that they are dealing with a reactive system and may make mistakes because of that. Reactor makes it obvious that you’re working with an async, non blocking API, without having to deal with callbacks. - Functions with the
suspend
keyword are only callable from other functions with thesuspend
keyword, or from a coroutine. This makes it more difficult to share logic with blocking code. The various methods ofFlux
simply take ordinary lambda’s, that can contain any logic. - Kotlin Flows & Coroutines is purely push-based: the async code executes when data becomes available from the source. Reactor, on the other hand, offers a push-pull hybrid: data gets pushed downstream, but subscribers can also request data from upstream and propagate backpressure signals upstream.
- A quick comparison between the
Flow
API from Kotlin Flows & Coroutines on one hand and theFlux
API from Reactor on the other hand shows that the amount of operations that Reactor offers is many, many times larger. It doesn’t even come close. Whatever use case you have, whatever operation you need, Reactor is likely to offer it. And in the rare cases where it doesn’t, it offers plenty of hook methods to implement your own.
Conclusion
Using Reactor as a Reactive Programming platform enables you to write async, non blocking code with any JVM language. However, by using some of Kotlin’s unique and powerful language constructs, you can make the experience even better!
Thank you for reading!