Client rate limiter

Danny Mor
Analytics Vidhya
Published in
7 min readApr 20, 2020

Implementing an async python client rate limiter using the queue based Token-Bucket algorithm

First of all, why would we want to limit our awesome software from shooting gazillion requests per second?

Well, sometimes we don’t have a choice.
Most of the available data providers cannot afford not to put a limit on the volumes and rate of which their clients consume through their APIs.
Putting limits on a service usage not only helps control resources allocation, which eventually translates to the money being spent,
But also a tool for dealing with DDoS attacks.

Ok, so we know data providers need to put rate limits, but again, why do we? and how do we do it?

Answering why is simple, we just have to deal with it somehow!

But the question of how has multiple answers.
We will answer it by implementing one of the solutions,
it’s called Token Bucket.

Token Bucket Algorithm

First, The original idea!

Imagine a bucket that can hold up to n tokens.
A new token is added to the bucket at a constant rate.
Just before sending a request, a token must be taken out of the bucket.
In case the bucket is empty a request needs to wait until a new token is added to the bucket and can be taken out.
In case the bucket is full no more tokens are added until tokens are taken out

We are going to do the exact opposite!

The one trying to send a request must add a token to the bucket first (like paying a tax) meanwhile a token is taken out of the bucket at a constant rate.
In this case, when a bucket is full a request needs to wait until a new token can be added, and when the bucket is empty no tokens are taken out (dah!!!)

Let’s lay out the plan.
We need a queue to hold the total amount of tokens for requests allowed at any given time. The max number of tokens is the rate limit.
If we want to create a request we first try to add a token to the queue.
If the queue is full we block the request until we have a free slot in the queue.
while adding tokens, we need a task that consumes tokens and free slots from the queue at a constant rate which will make sure we never exceed the rate allowed.

Now let’s implement!

Let’s set up a skeleton for a RateLimiter class according to the plan

We initiate the class with a rate_limit parameter which is the number of requests per second or RPS in short,
a tokens_queue queue with a max size of rate_limit,
and a tokens_consumer_task to consume tokens from the queue at a fairly constant rate.

We need to implement 2 functions, namely
add_token and consume_tokens

add_token

add_token is fairly simple because all the heavy lifting here is done by asyncio.Queue’s put function:

Put an item into the queue. If the queue is full, wait until a free slot is available before adding the item.

so we put 1 into the tokens_queue, if the queue is full it will block the request until a token is consumed from the queue.

consume_tokens

This is where all the logic is!

we first set a consumption_rate to be 1/rate_limit.

for example, if we have a rate_limit of 100 requests per second,
it means the rate is
1 request per 0.01 of a second meaning,
the consumption_rate is 0.01.

we start an endless loop to continuously consume tokens from the queue and free slots from the queue.

if the queue is empty we have nothing to do but sleep for a consumption_rate seconds (or fraction of a second) and try again.

now we just need to make sure we do it at a constant rate.
The problem is we don’t have any control over the scheduling of coroutines.
so what we need to do is find out how much time has passed since the last iteration and translate that to the number of tokens we need to consume.

so let's implement the function get_tokens_amount_to_consume

time_from_last_consumption is the time between iterations.
calculated_tokens_to_consume is basically finding out how many tokens should have been consumed until now if the rate was the consumption_rate.
we need to “floor” the value because we can’t consume 3.5 tokens from the queue.
finally, the tokens_to_consume is the min between the total_tokens and the tokens_to_consume because we can’t consume more tokens than there are in the queue.

for example, if the rate limit is 20 RPS then the consumption rate is 0.05.
now let’s say
0.2 of a second has passed since the last time we consumed tokens. we now need to make up for it by consuming at most 4 tokens.
0.2/0.05 = 4 means how many units of 0.05 makes a 0.2.
but let’s say only 3 tokens were put into the queue during that time,
then the
tokens_to_consume in this example is 3

Ok, so now we can consume one or more tokens at once.
We do it in a synchronical fashion because this is our chance to keep up with the pace. we know the event loop scheduling has no regard for our rate.

We’re actually done with the implementation of the rate-limiting.
But there’s another issue we need to deal with that we haven’t talked about.

Concurrency limits

sometimes a data provider will also put limits on how many requests a client can send concurrently. this is something that the rate limiter we’ve created can not deal with.

To demonstrate why the rate limiter is not sufficient,
let's say we have a
rate limit of 20 RPS and concurrency limit of 10
we can by mistake, launch
20 requests at the same time and our rate limiter will allow such burst of requests because it is within the permitted limits, but we will be denied by the service because we exceeded the number of concurrent connections allowed.
Another question arises, isn’t concurrency limiter sufficient enough?
well, no! let’s say our concurrency limit is 1 request at a time,
the service rate limit is 2 requests per second, and a request takes 0.25 seconds to finish, then we can potentially send 4 requests per second

luckily, asyncio gives us a tool to deal with that, it’s called semaphore!
a semaphore is actually an old synchronization concept.
It limits the number of processes that can access a shared resource at any given time, so it is also a limiter of a sort but for concurrency of processes.

Let’s upgrade our rate limiter.
We’ll initiate the class with an extra parameter concurrency_limit and create a semaphore.
We’ll also create a function called throttle that a user can use for limiting both concurrency and rate.

throttle

before implementing throttle we need to understand what it is we need to do to enforce limits in a safe way.
the flow should be in the following order:

  1. acquire the semaphore
  2. add a token to the rate limiter queue
  3. send a request (or any other async operation we want to control its rate)
  4. release the semaphore

So we need to somehow make sure we release the semaphore after stage 3 has finished executing.
We don’t mind what runs in that stage,
we only need some kind of way to run it within our context.
luckily we can do it with an async generator context manager

Before producing a token we will try to acquire the semaphore.
If more than concurrency_limit coroutines are trying to add tokens then we block until one has finished,
remember that producing a token may also block.

Great, we’re done!
We still need to do some grown-up software stuff like
cleaning and playing safe.
We can also add a context manager as a good practice.
But since this is not the main focus, we’ll just skip to the complete code.
let’s just add everything and finish the class

RateLimiter

Use case

let's see how we can use it

Notes

This class is not thread-safe!

It is only meant to be used in an “async world”.
since it is meant to be used as a shared resource among multiple coroutines it is practically useless in the multi-threaded/multi-processes realm.

There are other solutions

the queue base token bucket is great! but there are other solutions also.
there are also sliding window log, sliding window counter, leaky bucket, etc
and there is the HTTP protocol way, which is responding to responses with status code 429, which indicates you’ve exceeded the rate limit, by waiting for a time span given in a header namedRetry-After and resending the request.
The thing with 429 status code is that not all APIs are nice enough to use the HTTP convention and use the header Retry-After (if any at all) to let you know how much time you should wait before resending.
Even if the API is playing nice, the Retry-After value can be a date, seconds, milliseconds … so you’ll have to implement the retry mechanism case by case. also if you have no indication of how much time to wait, you just need to wait for a constant time which is very inefficient.
The rate limiter gives a nice abstraction for rate limiting, all you need to do is get the limits from the service’s API docs or experiment to get estimations, But you can never anticipate an external service behavior so you should at least catch a 429 error and retry

Server rate limiter

The server implementation is much more nuanced and complex than the client implementation.
For example, when trying to limit requests to protect from a DDoS,
a service may want to filter out the attackers and let real users still have access.
Also, in distributed systems, the solutions are never that simple.

AioHTTP

aiohttp Is a really nice and recommended async HTTP python library.
It manages a connection pool internally that you can leverage to your benefit.
You can control the number of concurrent connections by setting the size of the connection pool, and you can even do it per host, cool huh?

--

--