Kotlin: Canceling Coroutines, Interrupting Threads, and Bits of Aspect-Oriented Programming (AOP)

Ondřej Štumpf
GoodData Developers

--

You might think that the three topics mentioned in the article’s title have nothing in common. You might also think that the topics, each so complex on their own, would deserve our undivided attention in separate articles. Well with that, you would definitely be right. But let me show you that when dealing with real-life scenarios, knowledge of coroutines, thread interruptions, and AOP is sometimes needed all at once.

Cooperative Cancellation

The good news is that there is plenty of study material related to coroutine cancellation. Let me recommend a great article by Florina Muntenescu — a must-read for anyone trying to stop execution of their coroutine, whether explicitly with the “cancel()” method or possibly utilizing the built-in “withTimeout()” method.

Of course Java threads have been with us significantly longer than Kotlin coroutines. Therefore, there is unsurprisingly an abundance of various tutorials and articles dealing with thread states and eventually thread interruption. If an exhaustive explanation is what your heart desires, I recommend a post by Wynn Teo.

But let me cut to the chase here — no matter whether your code runs in a coroutine or in a dedicated thread, its interruption must be cooperative. What does it mean? It means that your code must be aware that it could be interrupted and it must explicitly check whether it already happened.

In the coroutine world, you would do something like this:

val job = GlobalScope.launch {
files.map { file ->
ensureActive() // explicitly check if the coroutine is canceled
doSomethingBlockingWith(file)
}
}

The “ensureActive()” method throws an instance of CancellationException if the coroutine was canceled already. Meanwhile in the thread world, you would need something like this to check (and reset) the “interrupted” flag on the current thread:

val thread = Thread {
files.map { file ->
if (Thread.interrupted()) {
// check for the interrupted flag, reset it, and throw exception
throw InterruptedException()
}
doSomethingBlockingWith(file)
}
}

It’s a bit more verbose since there is no convenience method that would throw the InterruptedException for you, but otherwise it’s a very similar business to coroutines.

The Reality of Life

In theory, then, we know how our code should cooperate in order to be interruptible. But how do you actually do that in real life? I mean — you have your codebase. It might be large, it might be legacy, you may not know it all too well. It most likely contains 3rd party dependencies. And all of a sudden, you are tasked with making the code interruptible. How do you do that? Are you going to go through it all and add “ensureActive()” calls and checks for thread interruption into every method? And what if your codebase actually contains a mixture of coroutines and blocking calls, which is, in fact, very likely?

From Coroutines to Blocking Calls and Back

If your code starts its life as a coroutine and contains blocking calls, you need to create a bridge between the coroutine cancellation and interruption of the blocking call. The authors of Kotlin thought of that and provided us with a convenient “runInterruptible()” method, which you use like this:

val job = GlobalScope.launch {
files.map { file ->
runInterruptible {
doSomethingBlockingWith(file)
}
}
}

What the method does is that when someone calls “cancel()” on the coroutine job, it translates the coroutine cancellation state to the thread’s interrupted flag. Therefore, you can use the “Thread.interrupted()” method within the “doSomethingBlockingWith()” function to detect the thread’s state, as you normally would. It would be completely transparent to you that the flag was actually set thanks to the state of the originating coroutine.

Manual Labor or AOP?

Having dealt with “runInterruptible()”, let’s now focus on the elephant in the room — how do we place checks for coroutine/thread state in our codebase?

Obviously, you could do it by hand. This would mean going through your codebase (at least the critical parts) and adding the respective checks. There are some drawbacks in this approach:

  • It pollutes the code. Why should your business logic contain something that technical as checks for thread interruption?
  • You never know if you missed a spot. Inherently, you can only add so many checks. What if there is a path in the code that you miss and is therefore uninterruptible?
  • There is no way of doing this for 3rd party dependencies.

In certain scenarios, these drawbacks might not be that significant, and using this approach might be perfectly valid. But what if you are allergic to repetitive code and despise the idea of manually adding checks everywhere? Well, there is a way: have someone else add the checks for you, preferably in a way you wouldn’t even see them in your code. Enter Aspect-Oriented Programming; if you have never heard of it, I recommend a post by Philipp Gysel.

Spring AOP

If your code runs mostly in Spring beans, you might use Spring’s AOP module to weave your checks like this:

@Aspect
@Component
class InterruptionAspect {

@Pointcut("execution(* com.gooddata..*(..))")
private fun anyMethodExecution() {
}

@Before("anyMethodExecution()")
fun beforeAdvice() {
if (Thread.currentThread().isInterrupted) {
throw InterruptedException()
}
}
}

It addresses the drawback of polluting the codebase very nicely; this piece of code is everything you need to do. On the other hand:

  • It only works on Spring beans and method entries/exits. If your method contains a complex cycle without any external method calls, it remains uninterruptible. However, this will always be the case when using any AOP library.
  • You need to be a bit careful with the pointcut expression. If you are not, many checks might be added, and overall performance will suffer.
  • It still doesn’t address the issue in 3rd party dependencies — at least those that are not beans.

AspectJ

If you are not using Spring, or a significant part of your codebase lies outside beans, you might want to use Java’s ultimate AOP tool: AspectJ.

@Aspect
class CheckInterruptedAspect {

@Before("call(* com.gooddata..*(..))")
fun throwOnInterruptedThread() {
if (Thread.currentThread().isInterrupted) {
throw InterruptedException()
}
}
}

And a bit of config (placed in “aop.xml” in resources):

<aspectj>
<aspects>
<aspect name="com.gooddata.aspects.CheckInterruptedAspect"/>
</aspects>
<weaver>
<include within="com.gooddata.service1..*"/>
<include within="com.gooddata.service2..*"/>
<exclude within="*Test"/>
</weaver>
</aspectj>

Finally, add the AspectJ Java agent to the JVM at startup, using
“-javaagent:/app/BOOT-INF/lib/aspectjweaver-<version>.jar” as a command-line argument.

This example adds the thread interruption checks to the “service1” and “service2” packages, excluding test classes. This really is the ultimate approach:

  • You can specify even 3rd party packages, so that the aspect is weaved into them as well.
  • You are not restricted to Spring beans, any code can be extended like this.
  • Same as with Spring AOP, your codebase remains (visually) unspoiled.

But as uncle Ben once said: “With great power comes great responsibility”:

  • You need to be very careful with the pointcut expression. You have the power to completely kill your application’s performance if too many methods are matched by the expression.
  • There’s a bit of dark magic happening behind the scenes. Proper testing is essential. For example, forgetting to add the Java agent simply results in no aspects being applied without any warning. This means your code runs uninterruptibly and you might not even realize it.

Final Words

So, what’s your poison? Manual work? Spring AOP, or perhaps AspectJ? This is indeed a tough one, and the answer varies for each use case. At GoodData, we opted for manually added checks, as not all of our code is placed in Spring beans and AspectJ seemed like an overkill. However, I must admit that every time I see that “ensureActive()” call in our codebase, it stings a bit.

--

--