Shared mutable state with Coroutines

Few days ago there was an announcement about a new version of Kotlin 1.3 and starting from the coming release coroutines framework will not be experimental anymore. That means that framework moved out from experimental package and certainly can be used in our production applications. For this reason it’s a good time to start learning this library more precisely. As it was aforementioned in the title we are going to consider shared mutable state with coroutines.

It not a secrect that share mutable state is a root of all evil. This subject was discussed in this acricte. Here is a short list of issues, which you can face during multithreading programing: race condition, visibility, ordering etc…

So let assume that we have an integer variable and it will be called from multiple coroutines and each of them repeats multiple times. Each coroutine will have one single command, it’s to increment the value. Before we start let’s create a high order function. This function will accept two arguments and print elapsed time in milliseconds.

  • CoroutineContext represents common pool of shared threads.
  • Body is a suspend function which will be called inside the coroutine.
suspend fun runCoroutine(context: CoroutineContext = CommonPool, body: suspend () -> Unit) {
val s = 1000 // number of coroutine to launch
val t = 1000 // times an body is repeated by each coroutine
val time = measureTimeMillis {
val
jobs = List(s) {
launch(context) {
repeat(t) { body() }
}
}
jobs.forEach { it.join() }
}
println("Completed ${s * t} actions in $time ms")
}

Now we are ready to start and the first example in the simplest way without any additional synchronization:

fun main(args: Array<String>)= runBlocking {
var
incrementalValue = 0

runCoroutine {
incrementalValue++
}

System.out.print("Value $incrementalValue")
}

The obtained result in this case is unpredictable. With each launch ‘incrementalValue’ acquires different values. It works almost like a random generator:

Completed 1000000 actions in 65 ms
Value 596261

In this small example, we can explicitly observe a race condition issue. As we passed the default implementation (CommonPool) of CoroutineContext. CommonPool calculates the number of threads and creates ExecutorService.
Calculation based on the number of processors available to the Java virtual machine:

val parallelism = Runtime.getRuntime().availableProcessors() - 1).coerceAtLeast(1)

In my case there were 7 threads. Now we are going to try and decrease the numbers of threads up to two by creating an own executor.

fun main(args: Array<String>) = runBlocking {
var
incrementalValue = 0

val newFixedThreadPoolContext = newFixedThreadPoolContext(2, "Test Pool")

runCoroutine(newFixedThreadPoolContext) {
incrementalValue++
}
System.out.print("Value $incrementalValue ")
}

Now the output result is more accurate, however, it’s still the wrong one.

Completed 1000000 actions in 37 ms
Value 900524

Moreover, execution with two threads performed near two times faster compared with the previous implementation with seven threads. Switching between threads is an expensive operation.

But what about thread confident approach, where we have just a single thread to perform our coroutines. This is a common approach for UI communication and for the EventDispatcher Components and the implementation is quite simple.

fun main(args: Array<String>) = runBlocking {
var
incrementalValue = 0

val newFixedThreadPoolContext = newSingleThreadContext("Single thread")

runCoroutine(newFixedThreadPoolContext) {
incrementalValue++
}
System.out.print("Value $incrementalValue ")
}

Now we have the right result, however, this approach is appropriate just for small chunk operations, we are unable to perform long-term calls in this way, because application performance will be decreased.

Completed 1000000 actions in 41 ms
Value 1000000

Let’s continue research for the more efficient way to solve this issue. Rumor has it that volatile keyword/annotation helps to solve concurrent issues. Now it is a good time to check it.

@Volatile
var incrementalValue = 0
fun main(args: Array<String>) = runBlocking {

runCoroutine {
incrementalValue++
}
System.out.print("Value $incrementalValue ")
}

Expectedly it doesn’t solve the issue with mutable state in multithreading. Reading and writing of volatile variables causes the variable read or written to the main memory. Reading from and writing to the main memory is twice more expensive than accessing the CPU cache. Here is a really great article on this subject.

Completed 1000000 actions in 96 ms
Value 472228

In our arsenal we have thread safe structures, which solve the issue related to race conditions. For this counter task I am going to use AtomicInteger. The AtomicInteger class provides an int variable, which can be read and written atomically. Method incrementAndGet() is used for incrementing the value, also this structure contains advanced atomic operations like compareAndSet().

fun main(args: Array<String>) = runBlocking {

val
incrementalValue = AtomicInteger()

runCoroutine {
incrementalValue.incrementAndGet()
}
System.out.print("Value ${incrementalValue.get()} ")
}

Apparently it is the fastest way to perform the task in an efficient way. However, problems might occur while scaling into a more complex state.

Completed 1000000 actions in 44 ms
Value 1000000

Also, we can create a mutex for our critical section. A huge variety of different ways of building mutexs exist in the JVM world . The first one is to create a synchronized block by ‘synchronized’ keyword and the second one is to create an instance of the lock object, for example ReentrantLock. Besides, in the coroutine framework we have a new own Mutex implementation. The conceptual difference between coroutine Mutex and the Java language’s native synchronization mechanism block is non-reentrant. In addition mutex returns suspend function.

fun main(args: Array<String>) = runBlocking {
var
incrementalValue = 0
var mutex = Mutex()

runCoroutine {
mutex.withLock {
incrementalValue++
}
}

System.out.print("Value $incrementalValue ")
}

As we can see in the results below, coroutine mutex is time-consuming. You can try to replace it by the ‘synchronized’ block or by ReentrantLock and you will see much better results: near 50–60 milliseconds. However, Mutex class suspends an execution of coroutine instead of blocking it. Mutex is a good choice for some situations where you absolutely must modify some shared state periodically.

Completed 1000000 actions in 2528 ms
Value 1000000

In the last example of this article Actors will be used. The actor model is a conceptual model to deal with concurrent computation. It defines some general rules on how the system’s components should behave and interact with each other.

The idea is very similar to what we have in object-oriented languages: An object receives a message (a method call) and does something depending on what message it receives (which method we are calling). The main difference is that actors are completely isolated from each other and they will never share memory. It’s also worth noting that an actor can maintain a private state that can never be changed directly by another actor.

sealed class CounterMsg
object IncCounter : CounterMsg()
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg()

fun counterActor() = actor<CounterMsg> {
var
counter = 0
for (msg in channel) {
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}

fun
main(args: Array<String>) = runBlocking<Unit> {
val
counter = counterActor()
runCoroutine(CommonPool) {
counter.send(IncCounter)
}

val
response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close()
}

As a result the actor model works 3 times faster compared with a mutex. Besides, actor is more efficient than locking under load, because it always has work to do and it does not have to switch to a different context at all.

Completed 1000000 actions in 824 ms
Value 1000000

The counter inside the actor can be a primitive and not an AtomicInteger. It is safe because execution inside a coroutine is sequential in the precise. So, there is no concurrency in a coroutine. Inside execution between suspension points follows Regular JVM rules that state that operations on a single thread establish a happens-before relation.


Thanks for reading the article and sharing your views. Please clap and recommend as much as you can.

Please don’t hesitate to contact me: Github and Facebook