Creating a dead simple CountDownLatch with ZIO

Amitay Horwitz
Wix Engineering
Published in
4 min readFeb 16, 2020
Photo by Chris Barbalis on Unsplash

In this post I’ll explain what a CountDownLatch is, and how ZIO enables you to create concurrency primitives which are efficient, non-blocking and simple.

  • Note: I assume you have basic understanding of what ZIO is

Background

At Wix, we love testing. But testing concurrent code is often more difficult than writing it.

I recently needed to write a test which involves waiting until a number of async callbacks are called before making my assertion. More precisely, the system I’m testing involves async producer / consumer (or publish / subscribe) model, and the test looks something like this (with zio-test):

testM("consume produced records") {
for {
records <- Ref.make(Set.empty[Record])
_ <- consumeFrom(topic1) { record =>
records.update(_ + record)
}.fork
_ <- consumeFrom(topic2) { record =>
records.update(_ + record)
}.fork
_ <- produceTo(topic1 -> record1, topic2 -> record2)
consumed <- records.get
} yield assert(consumed, contains(record1) && contains(record2))
}

The problem here is that if the produceTo and consumeFrom functions actually run asynchronously my test will be flaky, because by the time I call records.get the result might still be empty.

We don’t like flaky tests. The simple solution is to poll, or retry the assertion at some constant interval until it succeeds. ZIO provides wonderful tools for such use cases, like the repeat combinator:

testM("consume produced records") {
for {
records <- Ref.make(Set.empty[Record])
_ <- consumeFrom(topic1) { record =>
records.update(_ + record)
}.fork
_ <- consumeFrom(topic2) { record =>
records.update(_ + record)
}.fork
_ <- produceTo(topic1 -> record1, topic2 -> record2)
(_, consumed) <- records.get.repeat(
Schedule.spaced(100.millis) &&
Schedule.doUntil(_.size == 2))

} yield assert(consumed, contains(record1) && contains(record2))
}

But this made the test much more cumbersome, and the polling strategy feels somewhat messy.

Looking back to Java

Java’s primary way of tackling concurrency is by using threads. However, writing multithreaded code is notoriously difficult and error prone. To make writing concurrent code more sane, there are many utilities available in the java.util.concurrent package. One of these is the CountDownLatch.

As described in the JavaDoc:

A synchronization aid that allows one or more threads to wait until a set of operations being performed in other threads completes.

A CountDownLatch is initialized with a given count. The await method block until the current count reaches zero due to invocations of the countDown method, after which all waiting threads are released and any subsequent invocations of await return immediately.

My use case seems like a perfect fit for a CountDownLatch, however, ZIO doesn’t provide one out of the box. It does provide other primitives that we can build on top of, such as Promise, Ref, Queue, Semaphore and others. So let’s try to create our own concurrent and non-blocking CountDownLatch using ZIO!

The API

The API we need is more simplified compared to Java’s CountDownLatch and looks something like this:

trait CountDownLatch {
def countDown: UIO[Unit]
def await: UIO[Unit]
}

We can omit some of the original methods, such as await with timeout:

public boolean await(long timeout, TimeUnit unit)
throws InterruptedException

Because we could easily achieve these semantics using ZIO’s built-in combinators, which demonstrate the power of having a composable functional API:

latch.await.timeout(1.second)

For the actual implementation, all we need is a Ref to keep the current count, and a Promise to signal when the count reached 0. Creating a Ref and a Promise is an effectful operation, which means creating our CountDownLatch will also be effectful (which makes sense, because it keeps internal state).

object CountDownLatch {
def make(count: Int): UIO[CountDownLatch] = for {
ready <- Promise.make[Nothing, Unit]
ref <- Ref.make(count)
} yield new CountDownLatch {
override def countDown: UIO[Unit] =
ref.update(_ - 1).flatMap {
case 0 => ready.succeed(()).unit
case _ => ZIO.unit
}

override def await: UIO[Unit] = ready.await
}
}

With just a few lines of code, we have created our own CountDownLatch! Let’s rewrite our original test using our new utility:

testM("consume produced records") {
for {
records <- Ref.make(Set.empty[Record])
latch <- CountDownLatch.make(2)
_ <- consumeFrom(topic1) { record =>
records.update(_ + record) *> latch.countDown
}.fork
_ <- consumeFrom(topic2) { record =>
records.update(_ + record) *> latch.countDown
}.fork
_ <- produceTo(topic1 -> record1, topic2 -> record2)
_ <- latch.await
consumed <- records.get
} yield assert(consumed, contains(record1) && contains(record2))
}

Now we can easily synchronize the behaviour of our tests, leading to a more reliable and consistent test suite.

Conclusion

Once again ZIO proves its value when writing asynchronous and concurrent code. We created a simple and non-blocking CountDownLatch with very little code, which is valuable for making our test suite more robust.

Read about similar experiences from my colleagues:

--

--