Building a Super Easy Rate Limiter with ZIO

Dmitry Karlinsky
Wix Engineering
Published in
4 min readFeb 10, 2020
Photo by Linda Wilson on Unsplash

In this post I will show how I was able to make an efficient non-blocking rate limiter, using very few lines of code, by leveraging ZIO’s builtin non-blocking concurrency primitives and its Fiber based concurrency.

The Challenge

Recently I needed to implement a rate limiter in one of our internal Scala services. This service (among other things) allows users to get a live view of Kafka messages as they fly by in the topic they select. When a user selects a topic to view, the server starts consuming this topic and streams the messages back to the client via a WebSocket.

Naturally, some topics have throughput that far exceeds human eye’s ability to follow, and the brains ability to process. That’s where rate limiting comes in. When the consumption rate exceeds a certain maximum, the Kafka consumer should slow down.

Rate limiters (or throttlers) are not easy to write. There are several algorithms to choose from (such as Token Bucket, Fixed Window, Sliding Log and others) and writing efficient and correct concurrent code is hard.

ZIO to the Rescue

Fortunately in our service we are using ZIO, which is a library for:

“Type-safe, composable asynchronous and concurrent programming for Scala”.

ZIO comes with tons of building blocks for writing async, non blocking concurrent code. Let’s see how we can leverage them to implement our rate limiter.

The API

First let’s define the Rate Limiter API. All we need is a single function, that given an effect a, will return a new effect a’, that may suspend itself, if the maximum throughput rate is exceed, before executing the original effect a:

The Implementation

There is a data structure provided by ZIO, that does something very similar. This structure is bounded Queue. A bounded queue is created with a finite size and if the queue is full, offer method will suspend until some other fiber calls take (the same happens in reverse when calling take on an empty queue) .

So all we need is to create a bounded queue and have a fiber draining it at a constant rate:

That’s it! This is our rate limiter! Only twenty lines of code! Most of which being necessary boilerplate (such as the forcomprehension).

Actually to understand it, you mostly need to understand two lines: line 8 and line 19. That's where the magic is happening.

Lets break it down:

  • Line 8:
_ <- queue.take.repeat(Schedule.fixed(period)).fork

The expression on the right side of <- is constructing a program that when executed will take an element from the queue repeatedly every period, forever (or until interrupted). It will do so in a separate Fiber , which is a light weight logical thread, provided by ZIO.
If we omitted the .fork , our for comprehension, and hence the whole effect returned by RateLimiter.makewould never finish, as the repeated queue.take would run in the main fiber of the effect.

  • Line 19:

queue.offer(()) *> effect

We modify the effect to first offer an element to the queue. If the queue is full, it will suspend the fiber, until an element is taken from the queue (by line 8). Once there is room in the queue the fiber proceeds to generate the original effect.

Note: A *> B is equivalent to A.flatMap(_ => B) (running the two effects sequentially, ignoring the result of the first).

Using the Rate Limiter

How do we use our Rate Limiter? Here’s a contrived example:

The second line in our for comprehension attempts to print the numbers between 1 and 1000 as fast as it can, but because the putStrLn effect is wrapped in limiter.rateLimit , the numbers will be printed at a rate not exceeding 1 per second (after initial burst of 10, allowed bybuffer = 10 )

Note, that the throughput is capped without blocking any threads!

Using outside ZIO

Can we also use our Rate Limiter in non ZIO code?
Yes! But this will require some boilerplate.

We will need an instance of zio.Runtime to run our ZIO effects. In most cases the default environment is fine, which we can create like this:

object Runtime extends zio.DefaultRuntime

First let’s make an a async version of rate limiter that works with scala Futures:

Note, that since scala Future represents an already started effect, which can't be delayed, the rateLimit method accepts a function that returns a Future. (A by name parameter :=> Future[T] would work as well, but I think in this case () => Future[T] expresses the intent better).

We could also make a blocking Rate Limiter:

Note that we use effectBlocking to wrap potentially blocking code in ZIO effect. This makes sure such code runs on a separate thread pool from non blocking ZIO operations.

Summary

To summarize: I’ve shown how using ZIO, I was able to put together a concurrent, non-blocking rate limiter in very few lines of code. I’ve also shown how this rate limiter might be used in both ZIO based code and in more traditional Future based and even blocking applications.

Thanks for reading!

Comment below, follow me on Medium and Twitter.

--

--

Dmitry Karlinsky
Wix Engineering

Backend developer at Wix.com. Working on data streaming infrastructure, built around Kafka. Love Scala and typed FP. Contributing to ZIO.