Monitoring with Coroutines @ heycar

Guillermo Campelo
heycar
Published in
8 min readJun 28, 2021
Photo by Mika Baumeister on Unsplash - Workers working together

Introduction

Kotlin Coroutines are a powerful alternative to several libraries for reactive programming on the JVM. We’ll first give a background on what they are and how they work.

Then we’ll explain our use case: how we used coroutines to turn sequential code into asynchronous code, and how we later improved our initial implementation by making it concurrent.

Finally, we’ll give our conclusions about our experience on using them and analyse different aspects like how challenging it was to introduce an improvement, how they affect code readability, how difficult is it to test the code.

Background on coroutines

A coroutine can be thought of as a “lightweight thread” (Official explanation). They’re provided by the official coroutines library (Github repo) and facilitated by the suspend modifier provided by the Kotlin language. They don’t block the thread they are running in, but instead they suspend their execution when they reach a suspension point. Potential suspension points are indicated when a function is marked with the suspend modifier.

Given the fact coroutines suspend but do not block the thread, it allows for another coroutine to use the thread and execute the code of that other coroutine until the initial coroutine (that was suspended) resumes. This means we could execute more code in the same amount of time using the same amount threads (assuming any coroutine suspends, if none does, the workload is not automagically distributed). There is a good runnable example in their docs.

Under the hood, the compiler uses CPS (continuation passing style) for suspending functions, creating a state machine to know where a coroutine suspended and from where it should continue its execution. One can think of the executions of the coroutines being managed by an event loop (a `select` on a socket done by operating systems or how javascript is able to handle the events and execute code “at the same time”). This video gives a clear picture on how this works.

Ecosystem

As mentioned before, there are already many libraries in the JVM ecosystem to do reactive programming.

Fortunately, the Coroutines library provides many out of the box integrations (like Guava, JDK8, nio, project reactor, RXJava), available as separate dependencies (reactive and integration). They’re really handy to turn completableFuture from Java or Mono, Flux from project reactor into suspending functions. We used it to interact with R2dbc(a reactive database client) library as it is built on project reactor.

Project Reactor is the reactive library of choice for Spring WebFlux, it is based on the Reactive Streams specification and used for building non-blocking applications on the JVM. A nice code comparison between coroutines and reactor can be seen on this video.

There is a good comparison between kotlin coroutines and goroutines (Go, not JVM) by Roman Elizarov.

Our use case

One of the projects that my team was involved in was the development of landing pages for makes, models and model variants for the vehicles on the hey.car platform. These pages give our users an overview about the available models of a make, the different variants of a model and a lot of information about the history, features of each one, along with technical details and links to vehicles on our site in case the user likes one or many.

Our lovely landing pages

They are automatically created, by one of our microservices, on Contentful (a headless CMS) and later edited by the SEO team. So far, we have ~1.5k pages, and more are coming as we grow.

Our approach to solve this problem looked like this:

  • Get the list of pages on Contentful from its API
  • For each page, make a request to the page e.g: hey.car/auto/{make}/ or hey.car/auto/{make}/{model}, notify depending on the HTTP status and the previous state(non-200 or 200).

The requirement also included running this health check periodically (ideally every 5 minutes)

Architecture and high-level overview of the requirement

Using coroutines

For our use case, we need to make ~3 calls (10 with the variants) to Contentful, ~1.5k calls (7.5k with variants, although not all are published) to the site, ~1.5k updates to the database and requests to Slack (the amount will depend on how much we’d need to notify). Given the amount of IO needed, not blocking the threads allows us to finish the whole process earlier (without needing to increase the amount of threads).

One important thing about Coroutines is that they are sequential by default. This how our first implementation of getPublishedEntries looked like:

Initial implementation - Synchonous

The functions getMakeEntries and getModelEntries are suspend functions (it is not visible on the gist, but the IDE marks them with an icon on the left). The execution would be as follows: the client will call getMakeEntries which in turn makes an http call, it will suspend until the response is back and store the result of the getMakeEntries in the makesPaginatedResponse variable. After that, it will call getModelEntries which in turn makes an http call, it will suspend until the response is back and store the result of the getModelEntries in the modelsPaginatedResponse variable.

Given that the makes and models requests are independent - we don’t need make information to request models nor the other way around - we can execute them asynchronously with the async coroutine builder and wait for the results in the place we need them.

This requires a few code changes: first, we need a CouroutineScope (more about structured concurrency in the medium post and video) which guarantees — among other things — that the coroutines started from it are joined before the call returns. We actually used a channelFlow, which had 2 benefits: it gives us a scope — ProducerScope — where we can call async and launch (coroutine builders), and it allows us to start processing the entries as soon as they are received — it is a flow, one can think of it as a stream, although there are differences.

We also need to wrap getMakeEntries and getModelEntries with async, so that they are executed asynchronously. Because coroutines are sequential by default, we need to `launch` the sequence “makes → await → send” so that models can also be processed concurrently. Because we launch from the scope, it is guaranteed that all makes and models are going to be sent before the flow is closed.

Resulting code after the modifications to make it asynchronous and concurrent

Benchmarks

We did some benchmarks comparing the current implementation, a sequential implementation and an implementation using a pool of actors with an i7 8565U @ 1.8Ghz(4 cores, 8 logical) and 16Gb of RAM. The processed contentful entries were the ones with status published by March 31. The min, avg and max values were taken from 5 samples of each execution.

Implementations

The initial implementation uses a flow (backed by a channel) to emit the entries from Contentful as soon as they are available, but processes each element of the flow sequentially (only saving the health is async).

The sequential implementation mimics no reactive flow by calling toList() on the flow, forcing all entries to be emitted before they are iterated.

The a second implementation, uses a pool of workers - coroutines -, to improving the processing of each element received from ContentfulApiService. This allows for concurrent processing of the elements, every worker will process up to one element at a time, but all of the workers will consume different elements received via the Channel.

The modifications we needed to do make to the initial implementation were the following:

Minimal changes to achieve concurrency of processing

Because getPublishedEntries returned a channelFlow, it was simple to return a channel - ReceiveChannel - instead using the produce extension function on a CoroutineScope. We tried different values for the actor pool - 10 in the screenshot - each value gave us different results.

We also did some executions with -Dkotlinx.coroutines.scheduler.core.pool.size=2 -Dkotlinx.coroutines.scheduler.max.pool.size=128 because by default the core.pool.size is equal to the number of processors, and the pods show 1 CPU without any configuration (the machine where tests were run had 4). This allowed us to have a better picture on how it’d behave once deployed.

Results

WP: Worker pool implementation; xT: x amount of threads; yWC: y amount of worker coroutines

Comparing by average, the best is the worker pool with 4 threads and 10 w coroutines followed by worker pool with 2 threads and 20 w coroutines , and the worst is the sequential solution. There is an 8.23% reduction of the time comparing current and sequential. We could have an 80% reduction on our current instance by using a worker pool with 20 coroutines, allowing us (with minimal code changes) to check the health of our published landing pages in only 17 seconds in average.

Conclusion

Coroutines offer a great alternative to existing reactive libraries for the JVM. Here are the key points we have from our implementation using them:

  • It is quite easy to turn sequential code into asynchronous (and the other way around). The only requisite is having a CoroutineScope where to either call launch, async or any other coroutine builder. One could also use GlobalScope, but that is a DelicateApi and it is not subject to structured concurrency;
  • It is explicit where an asynchronous computation is done and where new coroutines are launched. And depending on your IDE, the suspension points are highlighted;
  • There is minimal compromise of code readability, but it is intentional and proves useful. A sequential version of a code gets only a few modifications to get an asynchronous version, and the “logic” can still be read. There are no extra classes for managing different types of results from asynchronous computations (just Deferred ); there is no mental overhead of functional composition - unlike other libraries that force the developer to do that;
  • Structured concurrency is really powerful and it is there to help. We have seen throughout the code changes that having a scope guaranteeing to wait for all the coroutines that started from it helps a lot to determine what gets executed (code inside the scope) and when it gets executed (before the scope completes);
  • Great interoperability between other JVM reactive libraries. The coroutine integration libraries not only supports consuming other reactive libraries, but also providing them. One could, for example, write a library that uses coroutines and the person using that library does not need to write coroutines in their code, just integrate using the appropriate integration library.
  • It has a good support library for tests called kotlinx-coroutines-test to help developers write the tests that involve dealing with suspending functions, or launching coroutines. We were able to verify that our application closed correctly when some coroutine was cancelled and also that we didn’t accidentally leaked a coroutine if an incorrect scope was used.

References

There are many links throughout the text, but in case you are interested to read/watch more:

Thanks to:

--

--