Creating a dead simple CountDownLatch with ZIO
In this post I’ll explain what a CountDownLatch
is, and how ZIO enables you to create concurrency primitives which are efficient, non-blocking and simple.
- Note: I assume you have basic understanding of what ZIO is
Background
At Wix, we love testing. But testing concurrent code is often more difficult than writing it.
I recently needed to write a test which involves waiting until a number of async callbacks are called before making my assertion. More precisely, the system I’m testing involves async producer / consumer (or publish / subscribe) model, and the test looks something like this (with zio-test):
testM("consume produced records") {
for {
records <- Ref.make(Set.empty[Record])
_ <- consumeFrom(topic1) { record =>
records.update(_ + record)
}.fork
_ <- consumeFrom(topic2) { record =>
records.update(_ + record)
}.fork
_ <- produceTo(topic1 -> record1, topic2 -> record2)
consumed <- records.get
} yield assert(consumed, contains(record1) && contains(record2))
}
The problem here is that if the produceTo
and consumeFrom
functions actually run asynchronously my test will be flaky, because by the time I call records.get
the result might still be empty.
We don’t like flaky tests. The simple solution is to poll, or retry the assertion at some constant interval until it succeeds. ZIO provides wonderful tools for such use cases, like the repeat
combinator:
testM("consume produced records") {
for {
records <- Ref.make(Set.empty[Record])
_ <- consumeFrom(topic1) { record =>
records.update(_ + record)
}.fork
_ <- consumeFrom(topic2) { record =>
records.update(_ + record)
}.fork
_ <- produceTo(topic1 -> record1, topic2 -> record2)
(_, consumed) <- records.get.repeat(
Schedule.spaced(100.millis) &&
Schedule.doUntil(_.size == 2))
} yield assert(consumed, contains(record1) && contains(record2))
}
But this made the test much more cumbersome, and the polling strategy feels somewhat messy.
Looking back to Java
Java’s primary way of tackling concurrency is by using threads. However, writing multithreaded code is notoriously difficult and error prone. To make writing concurrent code more sane, there are many utilities available in the java.util.concurrent
package. One of these is the CountDownLatch
.
As described in the JavaDoc:
A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.
A
CountDownLatch
is initialized with a given count. Theawait
method block until the current count reaches zero due to invocations of thecountDown
method, after which all waiting threads are released and any subsequent invocations ofawait
return immediately.
My use case seems like a perfect fit for a CountDownLatch
, however, ZIO doesn’t provide one out of the box. It does provide other primitives that we can build on top of, such as Promise
, Ref
, Queue
, Semaphore
and others. So let’s try to create our own concurrent and non-blocking CountDownLatch
using ZIO!
The API
The API we need is more simplified compared to Java’s CountDownLatch
and looks something like this:
trait CountDownLatch {
def countDown: UIO[Unit]
def await: UIO[Unit]
}
We can omit some of the original methods, such as await
with timeout:
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException
Because we could easily achieve these semantics using ZIO’s built-in combinators, which demonstrate the power of having a composable functional API:
latch.await.timeout(1.second)
For the actual implementation, all we need is a Ref
to keep the current count, and a Promise
to signal when the count reached 0. Creating a Ref
and a Promise
is an effectful operation, which means creating our CountDownLatch
will also be effectful (which makes sense, because it keeps internal state).
object CountDownLatch {
def make(count: Int): UIO[CountDownLatch] = for {
ready <- Promise.make[Nothing, Unit]
ref <- Ref.make(count)
} yield new CountDownLatch {
override def countDown: UIO[Unit] =
ref.update(_ - 1).flatMap {
case 0 => ready.succeed(()).unit
case _ => ZIO.unit
}
override def await: UIO[Unit] = ready.await
}
}
With just a few lines of code, we have created our own CountDownLatch
! Let’s rewrite our original test using our new utility:
testM("consume produced records") {
for {
records <- Ref.make(Set.empty[Record])
latch <- CountDownLatch.make(2)
_ <- consumeFrom(topic1) { record =>
records.update(_ + record) *> latch.countDown
}.fork
_ <- consumeFrom(topic2) { record =>
records.update(_ + record) *> latch.countDown
}.fork
_ <- produceTo(topic1 -> record1, topic2 -> record2)
_ <- latch.await
consumed <- records.get
} yield assert(consumed, contains(record1) && contains(record2))
}
Now we can easily synchronize the behaviour of our tests, leading to a more reliable and consistent test suite.
Conclusion
Once again ZIO proves its value when writing asynchronous and concurrent code. We created a simple and non-blocking CountDownLatch
with very little code, which is valuable for making our test suite more robust.
Read about similar experiences from my colleagues: