More superpowers to our Spring Boot event-driven communication with retrying and disabling

Thomas Martin
WhozApp
Published in
6 min readDec 11, 2023

Recently I wrote about how we enhanced our event-based communication in our Spring Boot apps. Today we’ll look into two other features we added: retrying and disabling.

Generated with AI ∙ 7 December 2023

Retrying

In async computing, it is usually a good idea to have a certain number of retries when failing, because a resource could be unavailable on the first try but later able to compute the event.

Reactor has the backoff() method to do that, with a number of retries and a duration. This duration will be increased by Reactor each time you fail, for example with a 500ms minimal backoff, the delay between the first two tries will be a little longer than 500ms, and then the delay between the second and third try will be a little longer than 1s, and then the delay between the third and fourth try will be a little longer than 2s, and so on.

private fun handleEventWithRetry(event: E): Mono<E> = Mono.defer {
val startTimestamp = System.currentTimeMillis()
val lag = startTimestamp - event.timestamp
logger.info("Processing event ${event.logIdentifier} - lag $lag ms")
if (lag > maxAllowedLagMillis()) {
logger.warn { "Handler is lagging - lag $lag ms" }
}
Mono.just(handleEvent(event))
.doOnSuccess {
val handlerTime = System.currentTimeMillis() - startTimestamp
logger.debug { "Processed event ${event.logIdentifier} - time $handlerTime ms" }
if (handlerTime > debounceTimeMillis()) {
logger.warn { "Handler is too slow - time $handlerTime ms" }
}
}
}.retryWhen(
Retry.backoff(retryCount(), Duration.ofMillis(backoffMillis()))
.doBeforeRetry { retrySignal ->
logger.warn("Retrying: ${retrySignal.totalRetries()}; ${retrySignal.totalRetriesInARow()};", retrySignal.failure())
}
).then(Mono.just(event))

You can see how we log the event processing, especially the lag. We consider the handler too slow when the computing time is greater than the debouncing time because it means that we could be processing an event while the previous one is not over.

Disabling handler

A few weeks ago we added another feature, which allows to disable a handler, at startup or during the runtime. This is useful if you know you’ll have a huge number of events to process and will be lagging a lot, you can disable the handler and run a batch process to deal with missed events. This can be the case if an API client is synchronizing a great dataset for example.

Let’s get back to the flux creation method.

private fun createFlux(source: Flux<E>): Flux<E> {
logger.info { "Initializing debouncing parallel flux for ${this::class.simpleName}" }
val enabledAtStartup: Boolean = eventHandlerProperties.enable[this::class.simpleName] ?: true
if (!enabledAtStartup) {
logger.warn { "${this::class.simpleName} handler is disabled by configuration !" }
}
return partition(source)
.parallel()
.runOn(scheduler)
.flatMap { partitionedFlux ->
debounce(partitionedFlux)
.publishOn(scheduler)
.flatMap { event ->
if (enabledAtStartup && handlerDisablingService.isHandlerEnabled(this::class.simpleName ?: "")) {
handleEventWithRetry(event)
} else {
logger.debug { "${this::class.simpleName} is disabled !" }
Mono.just(event)
}
}
}
.sequential()
.onErrorContinue { throwable, _ ->
// timeouts are normal, this is how we cancel partitions
if (throwable !is TimeoutException) {
logger.error("Unexpected error while publishing event to sink", throwable)
}
}
}

You can see that there is a map in configuration that can disable a handler. We use the class name so the configuration is generic for any handler implementing DebouncingParallelHandler .

Moreover, there is a handlerDisablingService that is responsible for disabling handlers at runtime. This service also uses a map of class names.

@Service
class HandlerDisablingService {
private val enabledHandlersByClassSimpleName: MutableMap<String, Boolean> = mutableMapOf()
fun enableHandler(simpleName: String) {
enabledHandlersByClassSimpleName[simpleName] = true
logger.info { "$simpleName handler is enabled !" }
}
fun disableHandler(simpleName: String) {
enabledHandlersByClassSimpleName[simpleName] = false
logger.warn { "$simpleName handler is disabled !" }
}
fun isHandlerEnabled(simpleName: String) = enabledHandlersByClassSimpleName[simpleName] ?: true
fun listDisabledHandlers() = enabledHandlersByClassSimpleName.entries.filter { !it.value }.map { it.key }
companion object : KLogging()
}

A JMX controller allows admins to call this service at runtime.

package io.biznet.event.handler
import mu.KLogging
import org.springframework.jmx.export.annotation.ManagedOperation
import org.springframework.jmx.export.annotation.ManagedResource
@ManagedResource(
objectName = "whoz:category=MBeans,name=handler_managed_operation",
description = "Disabling Handler"
)
class HandlerDisablingController(private val handlerDisablingService: HandlerDisablingService) {
@ManagedOperation(description = "Enable a handler. Please give the simple name of the class. Handler MUST implements DebouncingParallelHandler." +
"\nNote that there is no control about the validity of the provided class name.")
fun enableHandler(simpleName: String) {
logger.info { "JMX START: Enabling $simpleName handler" }
handlerDisablingService.enableHandler(simpleName)
logger.info { "JMX STOP: $simpleName handler enabled" }
}
@ManagedOperation(description = "Disable a handler. Please give the simple name of the class. Handler MUST implements DebouncingParallelHandler." +
"\nNote that there is no control about the validity of the provided class name.")
fun disableHandler(simpleName: String) {
logger.info { "JMX START: Disabling $simpleName handler" }
handlerDisablingService.disableHandler(simpleName)
logger.info { "JMX STOP: $simpleName handler disabled" }
}
@ManagedOperation(description = "List currently disabled handlers.")
fun listDisabledHandlers(): List<String> {
logger.info { "JMX START: list disabled handlers" }
return handlerDisablingService.listDisabledHandlers()
}
companion object : KLogging()
}

Pretty basic implementation without much input control, but this is for our handful of technical administrators who have full access to the servers so that’s fine.

Let’s try those features

Writing unit tests for Reactor is not the most intuitive programming thing, so I give you a handful of tests on retrying and disabling.

package io.biznet.event.handler

import io.biznet.event.EventHandlerProperties
import io.biznet.event.LoggableEvent
import io.kotest.core.spec.Spec
import io.kotest.core.spec.style.StringSpec
import io.kotest.core.test.TestCase
import io.kotest.matchers.collections.shouldContainAll
import io.kotest.matchers.collections.shouldNotContain
import io.kotest.matchers.shouldBe
import org.reactivestreams.Publisher
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Scheduler
import reactor.core.scheduler.Schedulers
import reactor.test.StepVerifier
import java.time.Duration

@Suppress("unused")
class DebouncingParallelHandlerSpec : StringSpec() {

private lateinit var eventHandler: DebouncingParallelHandler<DebouncedPartitionedEvent>
private val handlerDisablingService: HandlerDisablingService = HandlerDisablingService()
private lateinit var eventHandlerProperties: EventHandlerProperties

private val event1 = DebouncedPartitionedEvent("event-1", "debounce-1", "partition-1")
private val event2 = DebouncedPartitionedEvent("event-2", "debounce-1", "partition-1", true)
private val event3 = DebouncedPartitionedEvent("event-3", "debounce-2", "partition-1")
private val event4 = DebouncedPartitionedEvent("event-4", "debounce-3", "partition-2")
private val event5 = DebouncedPartitionedEvent("event-5", "debounce-1", "partition-3")
private val event6 = DebouncedPartitionedEvent("event-6", "debounce-1", "partition-1")

private val poolSize = 10
private val debounceTimeMillis = 100L
private val retryTimeMillis = 10L
private val retryCount = 2L

private lateinit var handledEventsCounts: MutableMap<String, Int>


override suspend fun beforeSpec(spec: Spec) {
super.beforeSpec(spec)
//Do not remove this configurations needed for the CI
//It synchronizes schedulers pool sizes
System.setProperty("reactor.schedulers.defaultPoolSize", "$poolSize")
System.setProperty("reactor.schedulers.defaultBoundedElasticSize", "$poolSize")
}

override suspend fun beforeTest(testCase: TestCase) {
super.beforeTest(testCase)
handledEventsCounts = mutableMapOf()
eventHandlerProperties = EventHandlerProperties(debounceTimeMillis = debounceTimeMillis, defaultRetryCount = retryCount, defaultBackoffMillis = retryTimeMillis)
eventHandler = TestDebouncingParallelHandler(
eventHandlerProperties,
handlerDisablingService,
Schedulers.newBoundedElastic(2, poolSize, "test handler"),
debounceTimeMillis,
retryCount,
handledEventsCounts
)
}

init {

"errors should be retried" {
val createFlux = DebouncingParallelHandler::class.java.getDeclaredMethod("createFlux", Flux::class.java)
createFlux.isAccessible = true

StepVerifier.create(
createFlux.invoke(
eventHandler,
Flux.concat(
Mono.just(event2),
// The last one is heavily delayed to let exhaust the retries of event-2
Mono.delay(Duration.ofMillis(debounceTimeMillis * 20)).map { event6 },
)
) as Publisher<out Any>
)
.expectNextCount(1)
.expectComplete()
.verify()

handledEventsCounts[event2.name] shouldBe retryCount + 1
handledEventsCounts[event6.name] shouldBe 1
}


"handler can be disabled" {
val createFlux = DebouncingParallelHandler::class.java.getDeclaredMethod("createFlux", Flux::class.java)
createFlux.isAccessible = true
handlerDisablingService.disableHandler(TestDebouncingParallelHandler::class.simpleName!!)

StepVerifier.create(
createFlux.invoke(
eventHandler,
Flux.concat(
Mono.just(event3),
Mono.just(event4),
Mono.just(event5),
)
) as Publisher<out Any>
)
.expectNextCount(3)
.expectComplete()
.verify()

handledEventsCounts[event3.name] shouldBe null
handledEventsCounts[event4.name] shouldBe null
handledEventsCounts[event5.name] shouldBe null
}

"handler can be enabled" {
val createFlux = DebouncingParallelHandler::class.java.getDeclaredMethod("createFlux", Flux::class.java)
createFlux.isAccessible = true
handlerDisablingService.disableHandler(TestDebouncingParallelHandler::class.simpleName!!)
handlerDisablingService.enableHandler(TestDebouncingParallelHandler::class.simpleName!!)

StepVerifier.create(
createFlux.invoke(
eventHandler,
Flux.concat(
Mono.just(event3),
Mono.just(event4),
Mono.just(event5),
)
) as Publisher<out Any>
)
.expectNextCount(3)
.expectComplete()
.verify()
handledEventsCounts[event3.name] shouldBe 1
handledEventsCounts[event4.name] shouldBe 1
handledEventsCounts[event5.name] shouldBe 1
}


"handler can be disabled at startup" {
eventHandlerProperties = EventHandlerProperties(
debounceTimeMillis = debounceTimeMillis,
defaultRetryCount = retryCount,
defaultBackoffMillis = retryTimeMillis,
enable = mapOf(TestDebouncingParallelHandler::class.simpleName!! to false)
)
eventHandler = TestDebouncingParallelHandler(
eventHandlerProperties,
handlerDisablingService,
Schedulers.newBoundedElastic(2, poolSize, "test handler"),
debounceTimeMillis,
retryCount,
handledEventsCounts
)

val createFlux = DebouncingParallelHandler::class.java.getDeclaredMethod("createFlux", Flux::class.java)
createFlux.isAccessible = true


StepVerifier.create(
createFlux.invoke(
eventHandler,
Flux.concat(
Mono.just(event3),
Mono.just(event4),
Mono.just(event5),
)
) as Publisher<out Any>
)
.expectNextCount(3)
.expectComplete()
.verify()

handledEventsCounts[event3.name] shouldBe null
handledEventsCounts[event4.name] shouldBe null
handledEventsCounts[event5.name] shouldBe null
}

}

class TestDebouncingParallelHandler(
eventHandlerProperties: EventHandlerProperties,
handlerDisablingService: HandlerDisablingService,
scheduler: Scheduler,
val debounceTimeMillis: Long,
val retryCount: Long,
private var handledEventsCounts: MutableMap<String, Int>
) : DebouncingParallelHandler<DebouncedPartitionedEvent>(eventHandlerProperties, handlerDisablingService, scheduler
) {

override fun debounceKey(event: DebouncedPartitionedEvent) = event.debounceKey

override fun partitionKey(event: DebouncedPartitionedEvent) = event.partitionKey

override fun handleEvent(event: DebouncedPartitionedEvent) {
logger.info { "handle event ${event.name}" }
handledEventsCounts[event.name] =
handledEventsCounts[event.name]?.plus(1) ?: 1
Thread.sleep(debounceTimeMillis)
if (event.throwsError) throw Exception("oops, error for event ${event.name}")
}

override fun retryCount() = retryCount
}
}

With retrying our event processing is more secure, and with disabling, we have a way to protect the servers if things go wild.

We already greatly improved our reliability with these additional features, however, there is still room for improvement, like better handling of failed events that exhausted all possible retries and automatic batch processing of missed events. So stay tuned, we’ll continue our work on this topic in 2024, and I’ll share with you our progress.

--

--