Easily Create New Sync Types with ZIO STM

Natan Silnitsky
Feb 8 · 4 min read

In this post I will share how I created a synchronisation primitive called Gate that allows to potentially suspend execution by setting a boolean flag using a few lines of ZIO STM code.

Background

At Wix, we take testing seriously. Each commonly used micro-service supplies a testkit to its users.

Recently, I wrote a testkit that mimics our Kafka-client Sidecar (separate container that communicates with Kafka and implements additional client-side features).

The testkit simulates the Kafka broker by directly putting messages-to-be-produced (that the sidecar received) into fake consumer queues for further processing.

Requirement: Wait until message consumption is allowed to resume

One of the Kafka-client sidecar features is to globally pause message consumption (and then resume it). For the testkit logic I was required to implement the following API:

def pauseConsume(): UIO[Unit]
def resumeConsume(): UIO[Unit]
def suspendIfPaused(): UIO[Unit]
Photo by Paul MARSAN on Unsplash

This required me to suspend and resume handling of produced messages in my testkit code according to the state of some flag, that was altered by a user request.

I wrote the testkit code in ZIO, which was a lot of fun!

ZIO is a great pure functional Scala library that is optimised for writing concurrent/asynchronous code.

Obviously I wanted to find if ZIO has the ability to synchronise between the pause request and the consumption suspension.

Does ZIO have the appropriate synchronisation primitive?

ZIO offers several synchronisation primitives such as a Semaphore, Promise, Ref, etc.

First, I thought I would use ZIO Ref with a boolean flag:
suspendConsumption <- Ref.make(false)

and then, on incoming produced message, check if consumption needs to be suspended:
ZIO.WhenM(suspendConsumption.get)

But there is a problem — While Ref allows safe multithreaded (multi-fibered in ZIO’s case) access,ZIO.WhenM does not suspend the executing fiber in case the boolean expression is false.

Next, I looked at ZIO Semaphore but while the Semaphore does allow suspension with
def withPermit[R, E, A](task: ZIO[R, E, A]),

it does not allow to have separate actors, where one controls the state change (e.g. pause request) and the other executes the check-state-and-maybe-suspend effect.

Finally, I tried Promise. Promise looked promising :). It offers a way to pause consumption, simply by creating the promise:
consumersArePausedPromise <- Promise.make[Nothing, Unit]

a maybe-suspend method:consumersArePausedPromise.await

and a resume method: consumersArePausedPromise.succeed(())

but you can only perform this once - not multiple times. Once the Promise is completed, it can never be suspended on again.

ZIO STM to the rescue

While it seemed like ZIO has failed me for the first time as there were no ZIO synchronisation primitives that exactly matched my requirement, colleagues whom I consulted advised me to use ZIO STM as the building block for my own custom Gate functionality that will comply with my requirements.

STM stands for Software Transactional Memory. According to its scaladoc:

STM[E,A] represents an effect that can be performed transactionally, resulting in a failure E or a value A.

Software Transactional Memory is a technique which allows composition of arbitrary atomic operations. It is the software analog of transactions in database systems.

For more information on STM check out this great article by Fredrik Skogberg.

The generalised API

The consumption pause/resume/suspendIdNotAllowed requirement can be generalised to a Gate synchronisation primitive (Some of you may be familiar with ManualResetEvent in C#).

Just replace pauseConsume with lock, resumeConsume with unlock, and suspendIfNotAllowed with waitIfLocked

The API implementation

Below you can see how Gate was implemented using ZIO STM. Each method is implemented with a STM atomic transaction. It only takes a single line of code!

.commit is just syntactic sugar for wrapping the effect in STM.atomically.

So signal.set(false).commit is identical to

STM.atomically {
signal.set(false)
}

Notice that it uses STM’s TRef[Boolean]. The difference between TRef and Ref is that you can compose TRef withSTM.check which will suspend the effect and will retry only once the TRef value is changed.

So now with STM’s TRef we have a boolean state ref that can be suspended on, unlike the regular ZIO Ref.

Using the Gate

Below you can see a snippet of the testkit’s FakeSidecarService code that sets pause/resume consumption using the Gate:

And also a snippet of the testkit’s FakeConsumerHandler code that maybe-suspends consumption in case of a pause request using the Gate:

Unlike most Java/Scala synchronisation primitives, the executing thread is not blocked by this effect suspension because ZIO is implemented using lightweight green threads (Fibers).

Summary

In this post I showed how easy it is to create additional synchronization primitives in a few lines of ZIO code and how powerful STM can be even for very simple atomic operations.

Thank you for reading!
If you’d like to get updates on my experiences with ZIO, follow me on Twitter and Medium.

You can also visit my website, where you will find my previous blog posts, talks I gave in conferences and open-source projects I’m involved with.

If anything is unclear or you want to point out something, please comment down below.

Checkout my article on 10 ZIO pitfalls to avoid for ZIO beginners
(part 1, part 2).

Wix Engineering

Architecture, scaling, mobile and web development…

Natan Silnitsky

Written by

Backend Infrastructure Developer @Wix.com

Wix Engineering

Architecture, scaling, mobile and web development, management and more, written by our very own Wix engineers. https://www.wix.engineering/

Natan Silnitsky

Written by

Backend Infrastructure Developer @Wix.com

Wix Engineering

Architecture, scaling, mobile and web development, management and more, written by our very own Wix engineers. https://www.wix.engineering/

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store