In this post I will share how I created a synchronisation primitive called
Gate that allows to potentially suspend execution by setting a boolean flag using a few lines of ZIO STM code.
At Wix, we take testing seriously. Each commonly used micro-service supplies a testkit to its users.
Recently, I wrote a testkit that mimics our Kafka-client Sidecar (separate container that communicates with Kafka and implements additional client-side features).
The testkit simulates the Kafka broker by directly putting messages-to-be-produced (that the sidecar received) into fake consumer queues for further processing.
Requirement: Wait until message consumption is allowed to resume
One of the Kafka-client sidecar features is to globally pause message consumption (and then resume it). For the testkit logic I was required to implement the following API:
def pauseConsume(): UIO[Unit]
def resumeConsume(): UIO[Unit]
def suspendIfPaused(): UIO[Unit]
This required me to suspend and resume handling of produced messages in my testkit code according to the state of some flag, that was altered by a user request.
I wrote the testkit code in ZIO, which was a lot of fun!
ZIO is a great pure functional Scala library that is optimised for writing concurrent/asynchronous code.
Obviously I wanted to find if ZIO has the ability to synchronise between the pause request and the consumption suspension.
Does ZIO have the appropriate synchronisation primitive?
First, I thought I would use ZIO Ref with a boolean flag:
suspendConsumption <- Ref.make(false)
and then, on incoming produced message, check if consumption needs to be suspended:
But there is a problem — While
Ref allows safe multithreaded (multi-fibered in
ZIO’s case) access,
ZIO.WhenM does not suspend the executing fiber in case the boolean expression is false.
Next, I looked at ZIO Semaphore but while the
Semaphore does allow suspension with
def withPermit[R, E, A](task: ZIO[R, E, A]),
it does not allow to have separate actors, where one controls the state change (e.g. pause request) and the other executes the check-state-and-maybe-suspend effect.
Finally, I tried Promise.
Promise looked promising :). It offers a way to pause consumption, simply by creating the promise:
consumersArePausedPromise <- Promise.make[Nothing, Unit]
a maybe-suspend method:
and a resume method:
but you can only perform this once - not multiple times. Once the
Promise is completed, it can never be suspended on again.
ZIO STM to the rescue
While it seemed like ZIO has failed me for the first time as there were no ZIO synchronisation primitives that exactly matched my requirement, colleagues whom I consulted advised me to use ZIO STM as the building block for my own custom Gate functionality that will comply with my requirements.
STM[E,A] represents an effect that can be performed transactionally, resulting in a failure
E or a value
Software Transactional Memory is a technique which allows composition of arbitrary atomic operations. It is the software analog of transactions in database systems.
For more information on STM check out this great article by Fredrik Skogberg.
The generalised API
The consumption pause/resume/suspendIdNotAllowed requirement can be generalised to a Gate synchronisation primitive (Some of you may be familiar with ManualResetEvent in C#).
The API implementation
Below you can see how
Gate was implemented using ZIO STM. Each method is implemented with a STM atomic transaction. It only takes a single line of code!
.commit is just syntactic sugar for wrapping the effect in
signal.set(false).commit is identical to
Notice that it uses STM’s
TRef[Boolean]. The difference between
Ref is that you can compose
STM.check which will suspend the effect and will retry only once the TRef value is changed.
So now with STM’s
TRef we have a boolean state ref that can be suspended on, unlike the regular ZIO
Below you can see a snippet of the testkit’s
FakeSidecarService code that sets pause/resume consumption using the
And also a snippet of the testkit’s
FakeConsumerHandler code that maybe-suspends consumption in case of a pause request using the
Unlike most Java/Scala synchronisation primitives, the executing thread is not blocked by this effect suspension because ZIO is implemented using lightweight green threads (Fibers).
In this post I showed how easy it is to create additional synchronization primitives in a few lines of ZIO code and how powerful STM can be even for very simple atomic operations.
Thank you for reading!
If you’d like to get updates on my experiences with ZIO, follow me on Twitter and Medium. If anything is unclear or you want to point out something, please comment down below.