Building a Super Easy Rate Limiter with ZIO
In this post I will show how I was able to make an efficient non-blocking rate limiter, using very few lines of code, by leveraging ZIO’s builtin non-blocking concurrency primitives and its Fiber based concurrency.
The Challenge
Recently I needed to implement a rate limiter in one of our internal Scala services. This service (among other things) allows users to get a live view of Kafka messages as they fly by in the topic they select. When a user selects a topic to view, the server starts consuming this topic and streams the messages back to the client via a WebSocket.
Naturally, some topics have throughput that far exceeds human eye’s ability to follow, and the brains ability to process. That’s where rate limiting comes in. When the consumption rate exceeds a certain maximum, the Kafka consumer should slow down.
Rate limiters (or throttlers) are not easy to write. There are several algorithms to choose from (such as Token Bucket, Fixed Window, Sliding Log and others) and writing efficient and correct concurrent code is hard.
ZIO to the Rescue
Fortunately in our service we are using ZIO, which is a library for:
“Type-safe, composable asynchronous and concurrent programming for Scala”.
ZIO comes with tons of building blocks for writing async, non blocking concurrent code. Let’s see how we can leverage them to implement our rate limiter.
The API
First let’s define the Rate Limiter API. All we need is a single function, that given an effect a, will return a new effect a’, that may suspend itself, if the maximum throughput rate is exceed, before executing the original effect a:
The Implementation
There is a data structure provided by ZIO, that does something very similar. This structure is bounded Queue. A bounded queue is created with a finite size and if the queue is full, offer
method will suspend until some other fiber calls take
(the same happens in reverse when calling take
on an empty queue) .
So all we need is to create a bounded queue and have a fiber draining it at a constant rate:
That’s it! This is our rate limiter! Only twenty lines of code! Most of which being necessary boilerplate (such as the for
comprehension).
Actually to understand it, you mostly need to understand two lines: line 8 and line 19. That's where the magic is happening.
Lets break it down:
- Line 8:
_ <- queue.take.repeat(Schedule.fixed(period)).fork
The expression on the right side of <-
is constructing a program that when executed will take an element from the queue repeatedly every period
, forever (or until interrupted). It will do so in a separate Fiber
, which is a light weight logical thread, provided by ZIO.
If we omitted the .fork
, our for
comprehension, and hence the whole effect returned by RateLimiter.make
would never finish, as the repeated queue.take
would run in the main fiber of the effect.
- Line 19:
queue.offer(()) *> effect
We modify the effect
to first offer
an element to the queue. If the queue is full, it will suspend the fiber, until an element is taken from the queue (by line 8). Once there is room in the queue the fiber proceeds to generate the original effect.
Note: A *> B
is equivalent to A.flatMap(_ => B)
(running the two effects sequentially, ignoring the result of the first).
Using the Rate Limiter
How do we use our Rate Limiter? Here’s a contrived example:
The second line in our for
comprehension attempts to print the numbers between 1 and 1000 as fast as it can, but because the putStrLn
effect is wrapped in limiter.rateLimit
, the numbers will be printed at a rate not exceeding 1 per second (after initial burst of 10, allowed bybuffer = 10
)
Note, that the throughput is capped without blocking any threads!
Using outside ZIO
Can we also use our Rate Limiter in non ZIO code?
Yes! But this will require some boilerplate.
We will need an instance of zio.Runtime
to run our ZIO effects. In most cases the default environment is fine, which we can create like this:
object Runtime extends zio.DefaultRuntime
First let’s make an a async version of rate limiter that works with scala Futures:
Note, that since scala Future
represents an already started effect, which can't be delayed, the rateLimit
method accepts a function that returns a Future
. (A by name
parameter :=> Future[T]
would work as well, but I think in this case () => Future[T]
expresses the intent better).
We could also make a blocking Rate Limiter:
Note that we use effectBlocking
to wrap potentially blocking code in ZIO effect. This makes sure such code runs on a separate thread pool from non blocking ZIO operations.
Summary
To summarize: I’ve shown how using ZIO, I was able to put together a concurrent, non-blocking rate limiter in very few lines of code. I’ve also shown how this rate limiter might be used in both ZIO based code and in more traditional Future based and even blocking applications.
Thanks for reading!
Comment below, follow me on Medium and Twitter.