Wix Engineering
Published in

Wix Engineering

Creating a dead simple CountDownLatch with ZIO

Photo by Chris Barbalis on Unsplash
  • Note: I assume you have basic understanding of what ZIO is

Background

At Wix, we love testing. But testing concurrent code is often more difficult than writing it.

testM("consume produced records") {
for {
records <- Ref.make(Set.empty[Record])
_ <- consumeFrom(topic1) { record =>
records.update(_ + record)
}.fork
_ <- consumeFrom(topic2) { record =>
records.update(_ + record)
}.fork
_ <- produceTo(topic1 -> record1, topic2 -> record2)
consumed <- records.get
} yield assert(consumed, contains(record1) && contains(record2))
}
testM("consume produced records") {
for {
records <- Ref.make(Set.empty[Record])
_ <- consumeFrom(topic1) { record =>
records.update(_ + record)
}.fork
_ <- consumeFrom(topic2) { record =>
records.update(_ + record)
}.fork
_ <- produceTo(topic1 -> record1, topic2 -> record2)
(_, consumed) <- records.get.repeat(
Schedule.spaced(100.millis) &&
Schedule.doUntil(_.size == 2))

} yield assert(consumed, contains(record1) && contains(record2))
}

Looking back to Java

Java’s primary way of tackling concurrency is by using threads. However, writing multithreaded code is notoriously difficult and error prone. To make writing concurrent code more sane, there are many utilities available in the java.util.concurrent package. One of these is the CountDownLatch.

The API

The API we need is more simplified compared to Java’s CountDownLatch and looks something like this:

trait CountDownLatch {
def countDown: UIO[Unit]
def await: UIO[Unit]
}
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException
latch.await.timeout(1.second)
object CountDownLatch {
def make(count: Int): UIO[CountDownLatch] = for {
ready <- Promise.make[Nothing, Unit]
ref <- Ref.make(count)
} yield new CountDownLatch {
override def countDown: UIO[Unit] =
ref.update(_ - 1).flatMap {
case 0 => ready.succeed(()).unit
case _ => ZIO.unit
}

override def await: UIO[Unit] = ready.await
}
}
testM("consume produced records") {
for {
records <- Ref.make(Set.empty[Record])
latch <- CountDownLatch.make(2)
_ <- consumeFrom(topic1) { record =>
records.update(_ + record) *> latch.countDown
}.fork
_ <- consumeFrom(topic2) { record =>
records.update(_ + record) *> latch.countDown
}.fork
_ <- produceTo(topic1 -> record1, topic2 -> record2)
_ <- latch.await
consumed <- records.get
} yield assert(consumed, contains(record1) && contains(record2))
}

Conclusion

Once again ZIO proves its value when writing asynchronous and concurrent code. We created a simple and non-blocking CountDownLatch with very little code, which is valuable for making our test suite more robust.

--

--

Architecture, scaling, mobile and web development, management and more, this publication aggregates blog posts written by our very own Wix engineers. Visit our official blog here: https://www.wix.engineering/

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store