Reactor: even more fun in Kotlin!

GJ Schouten
Team Rockstars IT
Published in
5 min readJan 12, 2023

In the previous article, I gave an introduction into Reactor: a Reactive Programming Java library that makes working with data easier and more fun. In this article, I will explore how Reactor behaves in that other, very popular, JVM language: Kotlin. I will also compare Reactor to Kotlin Flows & Coroutines.

Extension methods

One of the most powerful features of Kotlin are extension methods (also known as lambda-with-receiver): the ability to add methods to external classes in your own code. For example, this is a method that prepends “Hello” to a String:

fun String.sayHello() {
return "Hello " + this
}

Reactor uses this to add various extension methods of its own:

//Java
Flux.fromIterable(list)

//Kotlin
list.toFlux()


//Java
Flux.error(new RuntimeException())

//Kotlin
RuntimeException().toFlux()


//Java
StepVerifier.create(flux).verifyComplete()

//Kotlin
flux.test().verifyComplete()

There are quite a few more handy extension methods available. For a complete overview, check the API Docs.

Scope functions

Scope functions are extension methods that are defined by Kotlin itself. One example is the let method, that works on any Object. It executes a function block and returns its result. This is particularly useful in Builder style patterns, where you invoke a chain of methods that all return an Object of the same type. Of course a Flux isn’t technically a Builder, because the chain of methods creates a chain of fluxes that subscribe to one another, rather than operating on a single Flux, but the programming style is very similar.

Let’s have a look at the hotel example from the previous article. There, we have a hotelService and a reservationService that return a Reactor Flux of resp. HotelRooms and Reservations. A Flux is an Object that can stream data asynchronously and exists even when the data itself isn’t there (yet). In addition, a Flux has many, many methods to manipulate the stream.

hotelService.getAvailableRooms("Hilton")
.switchIfEmpty(hotelService.getAvailableRooms("Marriot"))
.take(50)
.map(reservationService::book)
.subscribe(
notificationService::notifyGuest,
customerService::notifyProblem
);

Now, let’s say that we only want to call the switchIfEmpty method if the Marriot hotels are enabled. In Java, you would have to do this:

Flux<HotelRoom> hotelRoomFlux = hotelService.getAvailableRooms("Hilton");

if (marriotEnabled) {
hotelRoomFlux = hotelRoomFlux.switchIfEmpty(hotelService.getAvailableRooms("Marriot"));
}

hotelRoomFlux
.take(50)
.map(reservationService::book)
.subscribe(
notificationService::notifyGuest,
customerService::notifyProblem
);

Using the Kotlin let scope function, we can simply do:

hotelService.getAvailableRooms("Hilton")
.let { hiltonFlux -> if (marriotEnabled) hiltonFlux.switchIfEmpty(hotelService.getAvailableRooms("Marriot")) else hiltonFlux }
.take(50)
.map(reservationService::book)
.subscribe(
notificationService::notifyGuest,
customerService::notifyProblem
);

Competition: Reactor vs Flows & Coroutines!

In addition to providing several improvements to the Reactor experience, Kotlin also comes with its own solution for Reactive Programming: Kotlin Flows & Coroutines. By adding the suspend keyword to a method, you can turn that method into an async method, that will execute in a non-blocking, async fashion when data is actually available. Here is an example, using the Spring Framework, which offers support for both Reactor as well as Kotlin Flows & Coroutines.

In Kotlin Flows & Coroutines, you would write:

//In the Web Controller:
@GetMapping("/{id}")
suspend fun getById(@PathVariable id: Long): Hotel { //Will not block while waiting for the data
return hotelService.getById(id);
}
@GetMapping("/search/{name}")
suspend fun searchByName(@PathVariable name: String): Flow<Hotel> { //Will return the Flow immediately, without waiting for the data
return hotelService.searchByName(name);
}

//In the HotelService:
suspend fun getById(id: Long): Hotel { //Will not block while waiting for the data
return coroutinesHotelRepository.getById(id);
}
suspend fun searchByName(name: String): Flow<Hotel> { //Will return the Flow immediately, without waiting for the data
return coroutinesHotelRepository.searchByName(name);
}

This is similar to the following in Reactor:

//In the Web Controller:
@GetMapping("/{id}")
fun getById(@PathVariable id: Long): Mono<Hotel> { //Will return the Mono immediately, without waiting for the data
return hotelService.getById(id);
}
@GetMapping("/search/{name}")
fun searchByName(@PathVariable name: String): Flux<Hotel> {
return hotelService.searchByName(name); //Will return the Flux immediately, without waiting for the data
}

//In the HotelService:
fun getById(id: Long): Mono<Hotel> { //Will return the Mono immediately, without waiting for the data
return reactiveHotelRepository.getById(id);
}
fun searchByName(name: String): Flux<Hotel> {
return reactiveHotelRepository.searchByName(name); //Will return the Flux immediately, without waiting for the data
}

A much more comprehensive comparison between the two can be found in this Medium article.

Which is better

As always, this depends largely on your specific needs and preferences. Speaking for myself, I prefer Reactor for the following reasons:

  • In Kotlin Flows & Coroutines, the code requires a mix between language keywords (eg. suspend) and API classes and methods (eg. Flow). This is not very elegant and slightly confusing. In Reactor, the use of the language itself (Java, Kotlin or any other JVM language) doesn’t change. Instead, the reactive patterns are fully contained in the Reactor API (eg. Flux).
  • Related to this, Reactor makes reactive programming more explicit. It’s obvious from the code that you’re working with a reactive API. Code written with Kotlin Flows & Coroutines looks very similar to blocking code. This may seem nice, but making explicit what is actually happening is very important for readability and maintenance. If programmers see code that differs from traditional, blocking code only because of the suspend keywords, they may not be aware that they are dealing with a reactive system and may make mistakes because of that. Reactor makes it obvious that you’re working with an async, non blocking API, without having to deal with callbacks.
  • Functions with the suspend keyword are only callable from other functions with the suspend keyword, or from a coroutine. This makes it more difficult to share logic with blocking code. The various methods of Flux simply take ordinary lambda’s, that can contain any logic.
  • Kotlin Flows & Coroutines is purely push-based: the async code executes when data becomes available from the source. Reactor, on the other hand, offers a push-pull hybrid: data gets pushed downstream, but subscribers can also request data from upstream and propagate backpressure signals upstream.
  • A quick comparison between the Flow API from Kotlin Flows & Coroutines on one hand and the Flux API from Reactor on the other hand shows that the amount of operations that Reactor offers is many, many times larger. It doesn’t even come close. Whatever use case you have, whatever operation you need, Reactor is likely to offer it. And in the rare cases where it doesn’t, it offers plenty of hook methods to implement your own.

Conclusion

Using Reactor as a Reactive Programming platform enables you to write async, non blocking code with any JVM language. However, by using some of Kotlin’s unique and powerful language constructs, you can make the experience even better!

Thank you for reading!

--

--

GJ Schouten
Team Rockstars IT

Software Architect / Engineer and creator of The Lukashian Calendar. The most important things in software? Simplicity, consistency and naming :-)