The sweetest fruit salad recipe for energy saving services

Pau García
DEXMA Engineering Blog
3 min readFeb 19, 2021

Chapter 1: Asynchronous

(This story begins in Chapter 0. Launching an energy-saving rocket to the stars)

Asynchronous is a broad term and may lead to misunderstanding. In our context, we need to receive the request, validate that it is correct, tell the client that we have properly received it and then process it in the background, decoupling this (maybe long) process from the caller. In other words, the connection is closed as soon as possible and the consumer just receives an acknowledgement.

Here came our first technological analysis and design decision. We needed to provide asynchronicity at server (Fire & Forget) and client level. The following 2 options were analysed:

  • Kotlin Coroutines (https://kotlinlang.org/docs/reference/coroutines-overview.html)
  • Webflux

At DEXMA we had experience and productive asynchronous components developed with Webflux but Kotlin had recently introduced the new Coroutines feature that is designed to provide the same solution we wanted to get from Webflux. We explored Kotlin Coroutines and performed some spikes but we finally chose Webflux because of its maturity and the successful experience in the organization. There were already too many new pieces in the component, so we decided to discard Kotlin Coroutines, and chose the safe and known path for this time: Webflux.

It is a Spring web service, so we take advantage of Spring Webflux starter component.

Having chosen WebFlux, let’s focus on asynchronicity at server level. At client level is more common and there are tons of examples out there. We achieve the Fire and Forget pattern by defining that our feature returns a Flux element:

class TranslateAndPublishTrend(…, private val datasourcesRepository: DatasourcesRepository, …) {  fun execute(trends: List<Trend>): Flux<String> {
(...)
return Flux
.fromIterable(trends)
.flatMap( { messagesOfDevice -> datasourcesRepository
.lookup(messagesOfDevice.key)
.filter { t: SimpleDataSource -> t.mac.isNotEmpty() }
.flatMap { t: SimpleDataSource -> readingsRepository.send(t, messagesOfDevice.value)}
}, apiConfiguration.maxConcurrencyPerRequest.toInt())
}
}

And subscribing to that Flux in our Controller, which returns an HTTP 202 (accepted) immediately (not waiting for the feature to get completed)

@RestController
@RequestMapping(“/trends”, produces = [APPLICATION_JSON_VALUE])
class TrendsController(...): TrendsResource {
@PostMapping
override fun processTrend(@RequestBody trends: Array<Trend>): ResponseEntity<Response> {
translateAndPublishTrend.execute(trends.asList())
.subscribe(
{ t: String? -> logger.info(“$t”) },
{ t: Throwable -> logger.error(“Sending metrics failed with response ${t.message}”) },
{ logger.info(“ ${trends.size} trends processed”) })
return ResponseEntity
.accepted()
.contentType(MediaType.APPLICATION_JSON)
.body(Response(configuration.responseProcessOK))
}

Sending a request that requires some time to process, we can visually see how the response is returned immediately after being received, while our service is still processing it in the background. You can see it in the following Postman example:

Notice that this behaviour had previously been defined from the user story and defined using Cucumber, following a TDD and BDD approach. We will cover it later in this series of articles.

The trip continues in Chapter 2: Outbound Pressure Control

--

--

Pau García
DEXMA Engineering Blog

Software Engineer who loves to find the simplest and most robust solution for each use case by applying architectural, design and clean-code principles