Event-driven communication in Spring Boot is lacking, so we augmented it

Thomas Martin
WhozApp
Published in
9 min readNov 27, 2023

Spring Boot events are quite simple, as we delved deeper into event-driven communication in our Whoz microservices, we quickly hit a wall because of missing functionalities. I’m happy to share with you the superpowers we gave to Spring Boot application listeners! Join us as we unlock their potential.

Photo by Pavan Trikutam on Unsplash

What is lacking in Spring Boot event listeners?

Deep diving into Spring Boot events is not the topic of this article, but let’s review the basics that you can do:

  • Write custom events
  • Publish them when needed in your code thanks to the ApplicationEventPublisher
  • Listen to them with beans implementing theApplicationListener interface, or using the @EventListener annotation.
  • By default, the listener is invoked synchronously. You need to write an additional configuration to enable async.

The last point is the first pain point we spotted. In our apps, events are used either for non-critical updates or very taxing processes. Whatever the reason, we clearly don’t want synchronous invocation hindering API response time.

If the clients are other microservices, async will be natural because of the Kafka used in the middle. But we also have internal events inside a microservice.

Wait, Spring Boot does support async right? We won’t be afraid of half a dozen lines of configuration. That’s right, but it distributes the events to threads. We publish a lot of events, every creation, update, or deletion of any entity publishes at least one event. And we can plug several listeners on each of them. Threads are costly, the whole point of doing heavy computation asynchronously is to keep resources free for HTTP clients, so using a threads for that is not what we’re looking into.

On top of that, we usually have event handlers that are not interested in every event, they just need to know that updates have been done on a certain entity or group of entities. To give an example, we have an internal search engine to find the best talents for a mission, depending on expected skills, availability, location, etc. All the information is stored in an index, that has to be updated when the talent or one of its related entities is updated. What we really want to do is update the index once for all changes related to this talent, i.e. grouping and debouncing the events. As long has the user is updating its profile, its timesheet, etc, we don’t need to reindex. We just want to detect the end of this updating session, and then reindex the talent. Also, since events are grouped, we would like to handle each group in parallel.

Kotlin Coroutines or Reactor?

When dealing with flows and asynchronous processes, we have two types of implementation in Whoz apps: Kotlin coroutines or Reactor.

I’m a Kotlin enthusiast and love the apparent simplicity of coroutines. So why considering Reactor? Well we have a mix of Kotlin and Groovy in our code base, so if the microservice is not yet fully migrated to Kotlin, we cannot just add suspend to methods.

Since we wanted to have a generic implementation for all event handlers, Groovy ones included, we have chosen Reactor in this case.

We’ll see later that a thread pool is configured for Reactor. So why bother with Reactor instead of Spring async configuration ? Because reactive programming is non blocking, so one thread is able to compute many events, instead of the one thread per event Spring does. If you need more information on that topic, this Baeldung article is a good start.

A new handler base class…

To implement those so-called superpowers, we created an abstract class called DebouncingParallelHandler. This is not an interface, because we’ll need a constructor where properties will be injected and a reactor Scheduler where is configured the processing strategy and the acceptable volume (number of threads, queue size, bounds…)

abstract class DebouncingParallelHandler<E : LoggableApplicationEvent>(
val eventHandlerProperties: EventHandlerProperties,
private val scheduler: Scheduler,
) {
abstract fun debounceKey(event: E): String
abstract fun handleEvent(event: E)
abstract fun partitionKey(event: E): String
}: ApplicationListener<E>

All our application listeners (that we named handlers) are now based on DebouncingParallelHandler and so implement these three methods:

  • partitionKey(event: E) defines a key from the event. This key will ensure that events with the same partition key are not processed in parallel.
  • debounceKey(event: E) also defines a key from the event. This key will debounce on events with the same debounce key. This means that the processing will start once no event with this key is received during a certain amount of time. If this debounce time is 2 seconds, if 3 events with the same debounce key are received at T0, T0+1s, and T0+2.5s, only the last one will be processed, because there was less than 2 seconds between the first and the second event and less than 2 seconds between the second and the third event.
  • handleEvent(event: E) is how the event has to be processed. This is the code we previously had in our onApplicationEvent(event: E) methods.

Note that the debounce key and the partition key can be the same. In our indexing example, this is the talentId for both. This is usually the same in our handlers, but we have a use case where we need to process some events sequentially, and a subset of this sequence is debounced.

You also probably wonder what is IdentifiedApplicationEvent ? It is just an abstract class on top of Spring’s ApplicationEvent. What is interesting here is the IdentifiedEventinterface that IdentifiedApplicationEvent implements, giving it better logging capabilities thanks to a logIdentifierproperty:


interface IdentifiedEvent {
val logIdentifier: String
}

… with new capabilities thanks to Reactor

Now let’s see how DebouncingParallelHandler implements all these capabilities. I’ll use some short pieces of code in this article, but you can find the complete class here.

Configuration

There is two levels of configuration. Overridable methods allow for a particular handler to override the default configuration.

    open fun retryCount(): Long = eventHandlerProperties.defaultRetryCount
open fun backoffMillis(): Long = eventHandlerProperties.defaultBackoffMillis
open fun debounceTimeMillis(): Long = eventHandlerProperties.debounceTimeMillis
open fun maxAllowedLagMillis(): Long = eventHandlerProperties.maxAllowedLagMillis

If not overridden, the configuration is set by a Spring configuration properties class.


@ConfigurationProperties(prefix = "myconf.handler")
@ConstructorBinding
class EventHandlerProperties(
val backpressureBufferSize: Int = 10000,
val debounceTimeMillis: Long = 2000L,
val maxAllowedLagMillis: Long = 10000L,
val partitionTtlFactor: Int = 10,
val handlerThreadPerCore: Int = 5,
val handlerQueueCap: Int = 20000,
val defaultRetryCount: Long = 3L,
val defaultBackoffMillis: Long = 1000L,
val enable: Map<String, Boolean> = emptyMap()
)

Parallel processing and flow initialization

The flow of events is emitted into a sink.


private val sink: Sinks.Many<E> = Sinks
.many()
.multicast()
.onBackpressureBuffer(eventHandlerProperties.backpressureBufferSize)

override fun onApplicationEvent(event: E) = emitEvent(event)


fun emitEvent(event: E) {
try {
sink.tryEmitNext(event).also { emitResult ->
if (emitResult.isSuccess) {
logger.debug("Emitted event ${event.logIdentifier}")
} else {
logger.warn("Processing event ${event.logIdentifier} failed with status ${emitResult.name}")
}
}
} catch (e: Exception) {
logger.error("Error while publishing to sink ${event.logIdentifier}", e)
}
}

A flux is created on this sink, with a call to Reactor’s parallel()on each partition of events. Each partition is a Flux itself, which is debounced, and then processed, with retry handling.


init {
createFlux(sink.asFlux()).subscribe()
}

private fun createFlux(source: Flux<E>): Flux<E> {
logger.info { "Initializing debouncing parallel flux for ${this::class.simpleName}" }

return partition(source)
.parallel()
.runOn(scheduler)
.flatMap { partitionedFlux ->
debounce(partitionedFlux)
.publishOn(scheduler)
.flatMap { event ->
handleEventWithRetry(event)
}
}
.sequential()
.onErrorContinue { throwable, _ ->
// timeouts are normal, this is how we cancel partitions
if (throwable !is TimeoutException) {
logger.error("Unexpected error while publishing event to sink", throwable)
}
}
}

We’ll get into the implementation of partition() , debounce() right after this part.

Before going on, please remark on the onErrorContinue call which is fundamental. Without it, any error would end the flux, meaning that no subsequent event would ever be processed!

Partitioning

What we call partitioning is translated into a groupBy() in Reactor.


private fun partition(eventFlux: Flux<E>): Flux<Flux<E>> =
eventFlux
.groupBy(::partitionKey)
.doOnNext { flux ->
val partitionCountValue = partitionCount.incrementAndGet()
logger.debug { "Created partition flux for ${flux.key()} - total count $partitionCountValue" }
}
.map { flux ->
flux
.timeout(Duration.ofMillis(eventHandlerProperties.partitionTtlFactor * eventHandlerProperties.debounceTimeMillis))
.doAfterTerminate {
val partitionCountValue = partitionCount.decrementAndGet()
logger.debug { "Terminated partition flux for ${flux.key()} - total count $partitionCountValue" }
}
}

The timeout() is important here, without it the number of Flux instances in memory would constantly increase.

Debouncing

What we call debouncing is translated into a sampleTimeout() in Reactor.


private fun debounce(eventFlux: Flux<E>): Flux<E> =
eventFlux
.groupBy(::debounceKey)
.doOnNext { flux ->
val debounceCountValue = debounceCount.incrementAndGet()
logger.debug { "Created debounce flux for ${flux.key()} - total count $debounceCountValue" }
}
.flatMap { flux ->
flux
.sampleTimeout { Mono.delay(Duration.ofMillis(eventHandlerProperties.debounceTimeMillis)) }
.doAfterTerminate {
val debounceCountValue = debounceCount.decrementAndGet()
logger.debug { "Terminated debounce flux for ${flux.key()} - total count $debounceCountValue" }
}
}

Let’s see those superpowers in action

Generated with AI

Such a central piece of code needs to be fully tested. Unit tests in reactor aren’t the simplest thing on Earth, so I am giving you the unit test class.

package io.biznet.event.handler

import io.biznet.event.EventHandlerProperties
import io.biznet.event.LoggableEvent
import io.kotest.core.spec.Spec
import io.kotest.core.spec.style.StringSpec
import io.kotest.core.test.TestCase
import io.kotest.matchers.collections.shouldContainAll
import io.kotest.matchers.collections.shouldNotContain
import io.kotest.matchers.shouldBe
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Scheduler
import reactor.core.scheduler.Schedulers
import reactor.test.StepVerifier
import java.time.Duration

@Suppress("unused")
class DebouncingParallelHandlerSpec : StringSpec() {

private lateinit var eventHandler: DebouncingParallelHandler<DebouncedPartitionedEvent>
private val handlerDisablingService: HandlerDisablingService = HandlerDisablingService()
private lateinit var eventHandlerProperties: EventHandlerProperties

private val event1 = DebouncedPartitionedEvent("event-1", "debounce-1", "partition-1")
private val event2 = DebouncedPartitionedEvent("event-2", "debounce-1", "partition-1", true)
private val event3 = DebouncedPartitionedEvent("event-3", "debounce-2", "partition-1")
private val event4 = DebouncedPartitionedEvent("event-4", "debounce-3", "partition-2")
private val event5 = DebouncedPartitionedEvent("event-5", "debounce-1", "partition-3")
private val event6 = DebouncedPartitionedEvent("event-6", "debounce-1", "partition-1")

private val poolSize = 10
private val debounceTimeMillis = 100L
private val retryTimeMillis = 10L

private lateinit var handledEventsCounts: MutableMap<String, Int>


override suspend fun beforeSpec(spec: Spec) {
super.beforeSpec(spec)
//Do not remove this configurations needed for the CI
//It synchronizes schedulers pool sizes
System.setProperty("reactor.schedulers.defaultPoolSize", "$poolSize")
System.setProperty("reactor.schedulers.defaultBoundedElasticSize", "$poolSize")
}

override suspend fun beforeTest(testCase: TestCase) {
super.beforeTest(testCase)
handledEventsCounts = mutableMapOf()
eventHandlerProperties = EventHandlerProperties(debounceTimeMillis = debounceTimeMillis, defaultRetryCount = retryCount, defaultBackoffMillis = retryTimeMillis)
eventHandler = TestDebouncingParallelHandler(
eventHandlerProperties,
handlerDisablingService,
Schedulers.newBoundedElastic(2, poolSize, "test handler"),
debounceTimeMillis,
retryCount,
handledEventsCounts
)
}

init {
"Events are partitioned into distinct flux according to their partition key" {
val partition = DebouncingParallelHandler::class.java.getDeclaredMethod("partition", Flux::class.java)
partition.isAccessible = true
val eventList = arrayOf(event1, event2, event3, event4, event5, event6)
val flux = Flux.just(*eventList)

@Suppress("UNCHECKED_CAST")
StepVerifier.create(partition.invoke(eventHandler, flux) as Flux<Flux<LoggableEvent>>)
.expectNextCount(eventList.distinctBy { it.partitionKey }.size.toLong())
.expectComplete()
.verify()
}

"Events are debounced according to their debounce key" {
val debounce = DebouncingParallelHandler::class.java.getDeclaredMethod("debounce", Flux::class.java)
debounce.isAccessible = true
val eventList = arrayOf(event1, event2, event3, event4, event5, event6)
val flux = Flux.just(*eventList)

@Suppress("UNCHECKED_CAST")
StepVerifier.create(debounce.invoke(eventHandler, flux) as Flux<LoggableEvent>)
.expectNextCount(eventList.distinctBy { it.debounceKey }.size.toLong())
.expectComplete()
.verify()
}

"Events are debounced according to their debounce key and time" {
val debounce = DebouncingParallelHandler::class.java.getDeclaredMethod("debounce", Flux::class.java)
debounce.isAccessible = true
val eventList = arrayOf(event1, event2, event3, event4, event5, event6)
val flux = Flux.concat(
*eventList.map { Mono.just(it).delayElement(Duration.ofMillis(debounceTimeMillis / 2)) }.toTypedArray()
)

@Suppress("UNCHECKED_CAST")
StepVerifier.create(debounce.invoke(eventHandler, flux) as Flux<LoggableEvent>)
// There will be 2 events for "debounce-1" key (event2 and event5 will be debounced, but event 1 and 6
// will be emitted because too much time will have passed
.expectNextCount(eventList.distinctBy { it.debounceKey }.size.toLong() + 1)
.expectComplete()
.verify()
}

"Events are debounced and partitioned according to their grouping keys" {
val createFlux = DebouncingParallelHandler::class.java.getDeclaredMethod("createFlux", Flux::class.java)
createFlux.isAccessible = true

StepVerifier.create(createFlux.invoke(eventHandler, Flux.concat(
Mono.just(event1),
Mono.delay(Duration.ofMillis(debounceTimeMillis / 2)).map { event2 },
Mono.just(event3),
Mono.just(event4),
Mono.just(event5),
// The last one is heavily delayed to let exhaust the retries of event-2
Mono.delay(Duration.ofMillis(debounceTimeMillis * 20)).map { event6 },
)) as Publisher<out Any>)
.recordWith(::ArrayList)
.expectNextCount(4)
.consumeRecordedWith { records ->
// * event 5 is emitted even though it has the same debounceKey as 6 because
// they aren't in the same partition.
// * event-2 produced an error but did not prevent event-6 to go though
records shouldContainAll listOf(event3, event4, event5, event6)
records shouldNotContain event1
}
.expectComplete()
.verify()
}

"errors don't kill the flux" {
val createFlux = DebouncingParallelHandler::class.java.getDeclaredMethod("createFlux", Flux::class.java)
createFlux.isAccessible = true

StepVerifier.create(
createFlux.invoke(
eventHandler,
Flux.concat(
Mono.just(event2),
Mono.just(event3),
Mono.just(event4),
Mono.just(event5),
// The last one is heavily delayed to let exhaust the retries of event-2
Mono.delay(Duration.ofMillis(debounceTimeMillis * 20)).map { event6 },
)
) as Publisher<out Any>
)
.expectNextCount(4)
.expectComplete()
.verify()
}

}

class TestDebouncingParallelHandler(
eventHandlerProperties: EventHandlerProperties,
handlerDisablingService: HandlerDisablingService,
scheduler: Scheduler,
val debounceTimeMillis: Long,
val retryCount: Long,
private var handledEventsCounts: MutableMap<String, Int>
) : DebouncingParallelHandler<DebouncedPartitionedEvent>(eventHandlerProperties, handlerDisablingService, scheduler
) {

override fun debounceKey(event: DebouncedPartitionedEvent) = event.debounceKey

override fun partitionKey(event: DebouncedPartitionedEvent) = event.partitionKey

override fun handleEvent(event: DebouncedPartitionedEvent) {
logger.info { "handle event ${event.name}" }
handledEventsCounts[event.name] =
handledEventsCounts[event.name]?.plus(1) ?: 1
Thread.sleep(debounceTimeMillis)
if (event.throwsError) throw Exception("oops, error for event ${event.name}")
}
}
}

We are quite pleased with the outcome of these improvements on Spring Boot event-driven communication and also enjoy the ease of adding new features to our event processing system. By the way, we’ll share more features in our next article, like the retries you can see mentioned in the code we shared.

We look forward to seeing your Spring Boot applications reach new heights with augmented event-driven communication. Maybe you have augmented Spring Boot capabilities as well in your project, if so please share in the comments!

--

--