In this post I will show how I was able to make an efficient non-blocking rate limiter, using very few lines of code, by leveraging ZIO’s builtin non-blocking concurrency primitives and its Fiber based concurrency.
Recently I needed to implement a rate limiter in one of our internal Scala services. This service (among other things) allows users to get a live view of Kafka messages as they fly by in the topic they select. When a user selects a topic to view, the server starts consuming this topic and streams the messages back to the client via a WebSocket.
Naturally, some topics have throughput that far exceeds human eye’s ability to follow, and the brains ability to process. That’s where rate limiting comes in. When the consumption rate exceeds a certain maximum, the Kafka consumer should slow down.
Rate limiters (or throttlers) are not easy to write. There are several algorithms to choose from (such as Token Bucket, Fixed Window, Sliding Log and others) and writing efficient and correct concurrent code is hard.
ZIO to the Rescue
Fortunately in our service we are using ZIO, which is a library for:
“Type-safe, composable asynchronous and concurrent programming for Scala”.
ZIO comes with tons of building blocks for writing async, non blocking concurrent code. Let’s see how we can leverage them to implement our rate limiter.
First let’s define the Rate Limiter API. All we need is a single function, that given an effect a, will return a new effect a’, that may suspend itself, if the maximum throughput rate is exceed, before executing the original effect a:
There is a data structure provided by ZIO, that does something very similar. This structure is bounded Queue. A bounded queue is created with a finite size and if the queue is full,
offer method will suspend until some other fiber calls
take (the same happens in reverse when calling
take on an empty queue) .
So all we need is to create a bounded queue and have a fiber draining it at a constant rate:
That’s it! This is our rate limiter! Only twenty lines of code! Most of which being necessary boilerplate (such as the
Actually to understand it, you mostly need to understand two lines: line 8 and line 19. That's where the magic is happening.
Lets break it down:
- Line 8:
_ <- queue.take.repeat(Schedule.fixed(period)).fork
The expression on the right side of
<- is constructing a program that when executed will take an element from the queue repeatedly every
period, forever (or until interrupted). It will do so in a separate
Fiber , which is a light weight logical thread, provided by ZIO.
If we omitted the
.fork , our
for comprehension, and hence the whole effect returned by
RateLimiter.makewould never finish, as the repeated
queue.take would run in the main fiber of the effect.
- Line 19:
queue.offer(()) *> effect
We modify the
effect to first
offer an element to the queue. If the queue is full, it will suspend the fiber, until an element is taken from the queue (by line 8). Once there is room in the queue the fiber proceeds to generate the original effect.
A *> B is equivalent to
A.flatMap(_ => B) (running the two effects sequentially, ignoring the result of the first).
Using the Rate Limiter
How do we use our Rate Limiter? Here’s a contrived example:
The second line in our
for comprehension attempts to print the numbers between 1 and 1000 as fast as it can, but because the
putStrLn effect is wrapped in
limiter.rateLimit , the numbers will be printed at a rate not exceeding 1 per second (after initial burst of 10, allowed by
buffer = 10 )
Note, that the throughput is capped without blocking any threads!
Using outside ZIO
Can we also use our Rate Limiter in non ZIO code?
Yes! But this will require some boilerplate.
We will need an instance of
zio.Runtime to run our ZIO effects. In most cases the default environment is fine, which we can create like this:
object Runtime extends zio.DefaultRuntime
First let’s make an a async version of rate limiter that works with scala Futures:
Note, that since scala
Future represents an already started effect, which can't be delayed, the
rateLimit method accepts a function that returns a
by name parameter
:=> Future[T] would work as well, but I think in this case
() => Future[T] expresses the intent better).
We could also make a blocking Rate Limiter:
Note that we use
effectBlocking to wrap potentially blocking code in ZIO effect. This makes sure such code runs on a separate thread pool from non blocking ZIO operations.
To summarize: I’ve shown how using ZIO, I was able to put together a concurrent, non-blocking rate limiter in very few lines of code. I’ve also shown how this rate limiter might be used in both ZIO based code and in more traditional Future based and even blocking applications.
Thanks for reading!
Comment below, follow me on Medium and Twitter.