Develop Your own API Rate Limiter in FastAPI — Part I

Arjun
8 min readOct 11, 2023

--

An API rate limiter is a crucial component in many web applications and APIs to manage and control the rate at which clients or users can make requests to the API. Its primary purpose is to prevent abuse, ensure fair usage, protect the API server from overload, and maintain a high quality of service for all users. Rate limiting is often implemented to:

  1. Protect Resources
  2. Ensure Fairness
  3. Mitigate DDoS Attacks
  4. Billing and Monetization

API Rate Limiter is just one part of API Management Services. As someone who have used MuleSoft and Azure API Management for managing APIs, I have always wondered what it takes to developing our own API Rate Limiter looks like.

Many Thanks to John Crickett’s who is making engineers like me and many others curious on how to solve real world challenges through his platform codingchallenges.fyi. This post is an attempt to solve the Write Your own API Limiter challenge from codingchallenges.fyi.

Step 1

In this step, lets build 2 simple APIs /limited and /unlimiteusing FastAPI, a python framework, as the endpoint suggests, one has rate limit and other one doesn’t have.

api.py

from fastapi import FastAPI, Request
from rate_limiter import RateLimitFactory
from limiting_algorithms import RateLimitExceeded
app = FastAPI()
ip_addresses = {}

@app.get("/limited")
def limited(request: Request):
client = request.client.host
try:
if client not in ip_addresses:
ip_addresses[client] = RateLimitFactory.get_instance("FixedCounterWindow")
if ip_addresses[client].allow_request():
return "This is a limited use API"
except RateLimitExceeded as e:
raise e

@app.get("/unlimited")
def unlimited(request: Request):
return "Free to use API limitless"

Let’s look at the import statements here.

  • from rate_limiter import RateLimitFactory
  • from limiting_algorithms import RateLimitExceeded

we are importing classes from different files rate_limiter.py and limiting_algorithms.py

Our code structure will have the following pattern.

  • api.py will have the endpoints while
  • rate_limiter.py is simply a Factory Class. we use this factory class to dynamically create instance of other classes. why have we done this? we will get the answers shortly.
  • limiting_algorithms.py is where we are going to write different Rate limiting algorithms like TokenBucket,FixedWindow, SlidingWindow, SlidingWindowCounter etc.

rate_limiter.py

from limiting_algorithms import FixedCounterWindow,TokenBucket, SlidingWindow, SlidingWindowCounter
class RateLimitFactory:
@staticmethod
def get_instance(algorithm:str = None):
if algorithm== "TokenBucket":
return TokenBucket()

elif algorithm== "FixedCounterWindow":
return FixedCounterWindow()

elif algorithm== "SlidingWindow":
return SlidingWindow()

else:
return SlidingWindowCounter()

From the above code, we see thatget_instance method accepts an algorithm as input and provides the corresponding object associated with it as its output. The concept here is that there may exist various rate-limiting algorithms that we can implement, and based on the specific algorithm we wish to employ, we can dynamically generate an instance of the object using this Factory Class.

It’s important to note that in real-time API limiters, a single, fixed rate-limiting algorithm is not always employed. Each algorithm is suitable for different consumption patterns and can be switched as needed.

Also see how we are storing ip_addresses in a variable in /limited

ip_addresses = {}
client = request.client.host
try:
if client not in ip_addresses:
ip_addresses[client] = RateLimitFactory.get_instance("TokenBucket")

Here, we are tracking API limit from every IP address and we create an instance of the class TokenBucket for every IP address. this way we can handle traffic from each end user independently.

Step 2

Image Credits : https://www.krakend.io/docs/throttling/token-bucket/

In this step, we will create our initial rate limiting algorithm, the Token Bucket. To enhance your understanding, I recommend revisiting Step 2 of the challenge and reading this section carefully.

As per this step, we will employ a token bucket with a capacity of 10. One token will be added to the bucket every second, and for each API call, one token will be removed. If an API call is made when the bucket is empty, we will respond with a 429 Too Many Requests error. It’s important to note that we won’t track actual tokens; instead, we’ll use a simple counter that increments and decrements.

When delving further into this problem, a couple of complexities arise:

  1. Adding Tokens Every Second: How can we ensure that tokens are added to the bucket (incrementing the counter) every second? This entails implementing a mechanism to increment the counter at a regular interval.
  2. Atomic Rate Limiting Operations: We need to guarantee that rate limiting operations are atomic. This means that if one API call is trying to update the counter while another API call reads the old value, it could result in inconsistent behavior in rate limiting. Therefore, we must address this concern to maintain a consistent and reliable rate limiting system.

Let’s take a close look at below implementation.

limiting_algorithms.py

from datetime import datetime, timedelta
import threading
from fastapi import HTTPException

class RateLimit:
def __init__(self):
self.interval = 60
self.limit_per_interval = 60

class RateLimitExceeded(HTTPException):
def __init__(self, detail="Rate limit exceeded"):
super().__init__(status_code=429, detail=detail)

class TokenBucket(RateLimit):
def __init__(self):
super().__init__()
self.total_capacity = 10
self.token_interval = 1
self.tokens_per_interval = 1
self.tokens = 10
self.last_updated = datetime.now()
self.lock = threading.Lock()

def allow_request(self, ip):
with self.lock:
curr = datetime.now()
gap = (curr - self.last_updated).total_seconds()
tokens_to_add = gap*self.tokens_per_interval
self.tokens = min(self.total_capacity,tokens_to_add+self.tokens)
self.last_updated = curr

if self.tokens>=1:
self.tokens-=1
return True
raise RateLimitExceeded()

Note how we have a RateLimit, a base class and RateLimitExceeded, another class to handle exceptions.

we are using threading.Lock() to ensure that the operations we perform are consistent and atomic.

The initial value of self.tokens is set to 10. Each time an API call is made, this value is reduced. However, to ensure that the number of tokens can be restored to a maximum of 10 every second, we don't rely on a dedicated scheduler or timer.

Instead, we capture the timestamp of the last API request made. Then, we calculate the time difference between the current time and the time when the last API call was executed. This time gap represents the total seconds between the two API calls. For instance, if there is a 5-second gap between the previous API call and the current one, we increment the self.tokens value by 5. If there are no time gaps between API calls, we don't make any changes. It's essential to note that the upper limit for the self.tokens value remains at 10.

gap = (curr - self.last_updated).total_seconds()
tokens_to_add = gap*self.tokens_per_interval
self.tokens = min(self.total_capacity,tokens_to_add+self.tokens)

If there are no tokens available, return RateLimitExceeded() error else return True.

Step 3

Lets implement a fixed counter window algorithm in this step

class FixedCounterWindow(RateLimit):
def __init__(self):
super().__init__()
self.counter = 0
self.curr_time = datetime.now().time().replace(second=0,microsecond=0)

def allow_request(self,ip):
with self.lock:
curr = datetime.now().time().replace(second=0,microsecond=0)
if curr!=self.curr_time:
self.curr_time = curr
self.counter = 0

if self.counter>=self.limit_per_interval:
raise RateLimitExceeded()
self.counter+=1
return True

We employ a 60-second time window, which begins at the 0th second and 0th microsecond. This choice is why we use the replace() function in our code. When an API call is initiated, we record the current time and commence incrementing the counter. As long as that minute is ongoing, we continue to increment the counter. If the count surpasses the set limit, we raise a RateLimitExceeded exception. However, when the minute concludes and the next minute begins, we reset our counter back to zero.

While this algorithm has its merits, it does have a notable limitation. Consider a scenario where, within a minute, the first 30 seconds have no API calls, the next 30 seconds have 60 API calls, and then in the next minute, the first 30 seconds witness another 60 API calls. Even though our API is designed to handle 60 API calls in a minute, when we examine the last 30 seconds of the previous minute and the first 30 seconds of the current minute together, we encounter a total of 120 API calls. Unfortunately, our current rate limiter does not account for this. Consequently, we require a mechanism that operates on a sliding window basis rather than a fixed window to address this limitation.

Step 4

In this step, we are implementing a sliding window algorithm

Image Credits: https://hechao.li/2018/06/25/Rate-Limiter-Part1/
class SlidingWindow(RateLimit):
def __init__(self):
super().__init__()
self.logs = []
self.limit_per_interval = 60
self.interval = 60


def allow_request(self,ip):
while self.lock:
curr = datetime.now()
while len(self.logs)>0 and (curr-self.logs[0]).total_seconds()>self.interval:
self.logs.pop(0)

if len(self.logs)>=self.limit_per_interval:
raise RateLimitExceeded()
return

self.logs.append(curr)
return True

This approach is elegantly simple yet highly effective. As we can observe, we maintain a log list that captures timestamps for all successful API calls. The key concept here is to keep a list containing only the timestamps from the last 60 seconds of API calls. The objective is to determine the number of API calls that occurred within the most recent 60 seconds. When a new API call is made, the first step is to remove all log entries older than 60 seconds. Once this cleanup is completed, we check the length of the log list. If it exceeds or equals our limit (e.g., 60 API calls per minute), we raise an exception; otherwise, we add the current timestamp to the logs. It’s important to note that we utilize threading.Lock() to ensure consistent results across all API calls.

While this approach is highly effective, it becomes less scalable, especially when dealing with high limits and intervals. Imagine the storage and removal of 10⁴ or 10⁶ log entries for every API call in such cases.

So far, we have been storing the data for each ip address directly in a variable. However, this is not how real-time APIs will store their data. On the other hand, modern APIs can be horizontally scaled to a great extent, thanks to technologies like docker, kubernetes etc. This made scaling possibly with few lines of code. However, the most important part is how do we design our APIs in such a way that it is scalable. If we store data in a variable and lets say we run multiple instances of same API load balanced, then each server will have its own copy of data and can cause inconsistency in the system.Hence we will need a central storage of the Rate limiting data from each IP address even though we are running our app on multiple servers. The complete code is available in github

In the following post (Part II), we are going to simulate a real-time API rate limiter by centralizing the storage in a redis cache and also improve our sliding window algorithm slightly by introducing a counter. This is going to be Step 5 and Step 6 in the challenge. In the next post, we will also learn to use postman to run performance testing.

Thanks for reading it this far. If you find this post useful, please leave a clap or two, or if you have suggestions or feedbacks, please feel free to comment, It would mean a lot to me!

Please feel free to connect with me on LinkedIn or X(formerly twitter).

--

--

Arjun

Practicing AI & Software Engineering with Computers and Kindness with Humans!