Celery throttling — setting rate limit for queues

In this article I gonna show how to control the throughput of a queue in a distributed task queue based systems, or, in a simpler language, how to set its rate limit. As an example, I’ll take python and my favorite Celery + RabbitMQ kit, although the algorithm that I use does not depend on these tools and can be implemented on any other stack.

Magomed Aliev
Analytics Vidhya
7 min readMar 25, 2020

--

So what’s the problem?

First, a few words about what kind of problem I’m trying to solve. The fact is that 99.9% of Internet services restrict access to their resources, not allowing to hit them with 100/1000 req/s and threatening to return 403 or 500 in response. The greedy ones, isn’t it? Sometimes even your own DB can act as such a service… Well, you can’t trust anyone nowadays, huh, so that you need to limit yourself somehow.

Of course, if we have only 1 process, then there is no problem, but we work with Celery — it means it is possible that we have not only N processes (hereinafter referred to as workers), but also M servers, and the task of synchronizing all this stuff doesn’t seem so trivial.

What’s in the box

The first thing you come across when looking for how to configure throttling in celery is the built-in rate_limit parameter of the Task class. It sounds like what you need, but, digging a little deeper, we notice that:

You cannot set the rate limit on a task group.

This is inconvenient, because access to some limited resource is often spread between different tasks.

# lets say we have a rate limit on github API calls of 60 req/min
# we’ll have to divide the calls between all tasks
@app.task(rate_limit='30/m')
def get_github_api1 ():
pass
@app.task(rate_limit='30/m')
def get_github_api2():
pass

This is a per worker instance rate limit, and not a global rate limit.

Of course, you can divide the limit again, taking into account the number of workers. But this will work really bad for uneven load. For example, during 1 minute we called get_github_api1() 60 times and didn't call get_github_api2() at all. Only 30 calls of get_github_api1() will be processed, although real limit is 60. In addition, every time a new task that needs access to this resource appears, you will have to re-calculate all the limits everywhere again. In general, the feature is certainly useful, but only for the simplest cases.

Bringing decision

Token bucket

The solution to the problem for me was Token Bucket — an algorithm used to control the channel bandwidth in computer and telecommunication networks. I will describe it shortly: in order to pass a channel check for a limit, a data packet must have a token taken from the store; at the same time, tokens arrive at the storage with a certain frequency. That is, the bandwidth of a channel is limited by the token issue rate, which we need to regulate. In our case, instead of a data packet, we have task, and RabbitMQ queues will act as a storage of tokens.

Writing some code

Well, let’s write some code. Create the main.py file and set the basic settings:

from celery import Celery
from kombu import Queue

app = Celery('Test app', broker='amqp://guest@localhost//')

# 1 queue for tasks and 1 queue for tokens
app.conf.task_queues = [
Queue('github'),
# I limited the queue length to 2, so that tokens do not accumulate
# otherwise this could lead to a breakdown of our rate limit
Queue('github_tokens', max_length=2)
]

# this task will play the role of our token
# it will never be executed, we will just pull it as a message from the queue
@app.task
def token():
return 1

# setting up a constant issue of our token
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# we will issue 1 token per second
# that means rate limit for github queue is 60 tasks per minute
sender.add_periodic_task(1.0, token.signature(queue='github_tokens'))

Do not forget to launch Rabbit, I prefer to do this with docker:

docker run -d --rm --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

Now let’s run celery beat - special celery worker, that is always launched and responsible for running periodic tasks.

celery -A main beat --loglevel=info

After that, messages will appear in the console once a second:

[2020-03-22 22:49:00,992: INFO/MainProcess] Scheduler: Sending due task main.token() (main.token)

Well, we have set up the issue of tokens for our ‘bucket’. Now all we have to do is to learn how to pull tokens. Let’s try to optimize the code that we wrote earlier for requests to github. Add these lines to main.py:

# function for pulling tokens from queue
def rate_limit(task, task_group):
# acquiring broker connection from pool
with task.app.connection_for_read() as conn:
# getting token
msg = conn.default_channel.basic_get(task_group+'_tokens', no_ack=True)
# received None - queue is empty, no tokens
if msg is None:
# repeat task after 1 second
task.retry(countdown=1)

# Added some prints for logging
# I set max_retries=None, so that tasks will repeat until complete
@app.task(bind=True)
def get_github_api1(self, max_retries=None):
rate_limit(self, 'github')
print ('Called Api 1')


@app.task(bind=True)
def get_github_api2(self, max_retries=None):
rate_limit(self, 'github')
print ('Called Api 2')

Now lets check how it works. In addition to the beat process, add 8 workers:

celery -A main worker -c 8 -Q github

And create a separate little script to run these tasks, call it producer.py:

from main import get_github_api1, get_github_api2

tasks = [get_github_api1, get_github_api2]

for i in range(100):
# launching tasks one by one
tasks[i % 2].apply_async(queue='github')

Start it with python producer.py, and look at logs of workers:

[2020-03-23 13:04:15,017: WARNING/ForkPoolWorker-3] Called Api 2
[2020-03-23 13:04:16,053: WARNING/ForkPoolWorker-8] Called Api 2
[2020-03-23 13:04:17,112: WARNING/ForkPoolWorker-1] Called Api 2
[2020-03-23 13:04:18,187: WARNING/ForkPoolWorker-1] Called Api 1
... (96 more lines)

Despite the fact that we have 8 workers, tasks are executed approximately once per second. If there was no token at the time task reached the worker, task is rescheduled. Also, I think you have already noticed, that in fact we throttle not queue, but some logical group of tasks, that can actually be located in different queues. Thus, our control becomes even more detailed and granular.

Putting it all together

Of course, the number of such task groups is not limited (only by capabilities of the broker). Putting the whole code together, expanding and ‘beautifying’ it:

from celery import Celery
from kombu import Queue
from queue import Empty
from functools import wraps

app = Celery('hello', broker='amqp://guest@localhost//')

task_queues = [
Queue('github'),
Queue('google')
]

# per minute rate
rate_limits = {
'github': 60,
'google': 100
}

# generating queues for all groups with limits, that we defined in dict above
task_queues += [Queue(name+'_tokens', max_length=2) for name, limit in rate_limits.items()]

app.conf.task_queues = task_queues

@app.task
def token():
return 1

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# generating auto issuing of tokens for all lmited groups
for name, limit in rate_limits.items():
sender.add_periodic_task(60 / limit, token.signature(queue=name+'_tokens'))

# I really like decorators ;)
def rate_limit(task_group):
def decorator_func(func):
@wraps(func)
def function(self, *args, **kwargs):
with self.app.connection_for_read() as conn:
# Here I used another higher level method
# We are getting complete queue interface
# but in return losing some perfomance because
# under the hood there is additional work done
with conn.SimpleQueue(task_group+'_tokens', no_ack=True, queue_opts={'max_length':2}) as queue:
try:
# Another advantage is that we can use blocking call
# It can be more convenient than calling retry() all the time
# However, it depends on the specific case
queue.get(block=True, timeout=5)
return func(self, *args, **kwargs)
except Empty:
self.retry(countdown=1)
return function
return decorator_func

# much more beautiful and readable with decorators, agree?
@app.task(bind=True, max_retries=None)
@rate_limit('github')
def get_github_api1(self):
print ('Called github Api 1')

@app.task(bind=True, max_retries=None)
@rate_limit('github')
def get_github_api2(self):
print ('Called github Api 2')

@app.task(bind=True, max_retries=None)
@rate_limit('google')
def query_google_api1(self):
print ('Called Google Api 1')

@app.task(bind=True, max_retries=None)
@rate_limit('google')
def query_google_api1(self):
print ('Called Google Api 2')

Thus, the total task calls of the google group will not exceed 100/min, and the github group — 60/min. Note that in order to set up such throttling, it took less than 50 lines of code. Is it possible to make it even simpler?

Moving further

Well, that’s how it works — no external instruments or services, only resources of our broker. But why stop there ;) ? Using this algorithm wisely, you can go further and create much more complex and flexible strategies. For example, some tasks can take more than 1 token (possibly even from different queues, if accessing several services), this way we will have the concept of the ‘weight’ of the task. Or expand the size of our ‘bucket’ of tokens, allowing them accumulate, thereby compensating for periods of inactivity. In general, the room for manoeuvre is really huge and limited only by your imagination and engineering skills)

Thank you for reading!

P.s. Share with your solutions, maybe we will find a better solution together ;)

--

--