Rethinking our queuing strategy

Federico Alvarez
Ordergroove Engineering
4 min readJan 31, 2020

We use Celery for asynchronous task execution. Recently, we ran into a challenge with Celery performance where we had to solve for executing multiple tasks with varying performance, priority and throughput requirements. More concretely, application needed to populate message templates (SMS or email) that will be sent to customers.

As you might expect, the information required for different types of messages is very different from each other. For example, in order to send a welcome message you might only need the customer’s name but for sending reminder emails for upcoming orders, you would probably need some order details such as: total, products, shipping info, etc.

Problem(s)

Celery’s default setup is to use a single queue where all workers will consume from to execute the tasks. In our case, the diversity of messages types meant that all of the tasks took different times to finish, and if all are routed through a same queue then it would be very hard to predict the overall performance.

This was our biggest concern, because at a certain time of the day we needed to send thousands of reminder messages that were put in the queue to be processed, the queue would fill and some other messages (ie: welcome SMS) were caught in traffic waiting for thousands of reminders to be processed. This situation was hurting the user experience, who expected an SMS right after they opted into a program.

Initial queuing strategy with single queue

Other problem we faced was that some tasks needed to be throttled (rate limited) and that also slowed down the rest of the non-throttled tasks from the queue.

Solution

We decided to have separate queues and consumers for those tasks that were causing the bottle neck, and by doing this we would be able to route tasks based on their performance so they don’t affect others. Slower tasks could run at its own pace without hurting overall performance and everyone would be happy. Well… almost…

Indeed another problem was yet to be solved, different clients have different needs, and some of them required some tasks to make requests to endpoints on their platforms. Putting it in other words, we needed to isolate task execution as much as possible to prevent those clients from affecting others.

Final queuing strategy with dedicated queues and workers

Too many promises and nice diagrams but no code to prove the concept? This looks like black magic… Let’s dive into the implementation.

Implementation

First we have to configure task_queues and task_routes Celery settings in order to be able to implement our complex routing logic. In case you’re not a Celery expert (don’t worry, nobody is), task_queues is the list of queues the workers will consume from and task_routes is the list of routers used to decide the destination of a task.

Our task_queues and task_routes need to look like this:

app.conf.task_queues = dict({
'app.task1': {
'exchange': 'app',
'routing_key': 'app.task1'
},
'app.task2': {
'exchange': 'app',
'routing_key': 'app.task2'
},
},
# Client specific declaration
**dynamic_routing_configs['task_queues']
)

app.conf.task_routes = dict({
'task1': {
'queue': 'app.task1'
},
'task2': {
'queue': 'app.task2'
},
},
# Client specific declaration
**dynamic_routing_configs['task_routes']
)

You might have noticed the magic at the end of each dictionary, dynamic_routing_configs. We came up with a way to dynamically build the dynamic routes (per client) and inject them into Celery configs as shown above.

CLIENT_CONFIGS = {
'ABC123': { # client identifier
'throttled_task': {
'queue': 'app.throttled_task.ABC123',
'num_of_nodes': 2
},
'other_task': {
'queue': 'app.other_task.ABC123'
}
}
}
def generate_dynamic_routing_configs():
dynamic_queues = {}
dynamic_routes = {}
for task_key in celery_app.tasks.keys():
if task_key.startswith('client.'):
_, client, task_name = task_key.split('.')
client_settings = CLIENT_CONFIGS.get(client, {})
task_settings = client_settings.get(task_name, {})
queue_name = task_settings.get('queue')
if not queue_name:
continue
dynamic_queues[queue_name] = {
'exchange': CELERY_TASK_DEFAULT_EXCHANGE,
'routing_key': queue_name
}

dynamic_routes[task_key] = {'queue': queue_name}
return {
'task_queues': dynamic_queues,
'task_routes': dynamic_routes
}

Now that we defined the queues and routes, we need to tell the workers from which queue they should consume from. To accomplish this we used one of Celery’s signals, celeryd_after_setup, which is sent after the worker instance is set up, but before it calls run.

@celeryd_after_setup.connect
def setup_task_routing_queue(sender, instance, **kwargs):
sender_name = sender.split('@')[0]
# Deselect all throttled queues,
# they should have dedicated workers
instance.app.amqp.queues.deselect(throttled_queues)
# set workers celery1, celery2,...celery[N] to a specific queue
# where N is the number of throttled queues
config = throttled_worker_config.get(sender_name)
if config:
instance.app.amqp.queues.select(config['queue'])

And throttled_worker_config is a dynamically generated mapping of worker’s name and queue. For example:

{
'celery1': {
'queue': 'app.throttled_task.ABC123'
},
'celery2': {
'queue': 'app.throttled_task.ABC123'
},
'celery3': {
'queue': 'app.task1.ABC123'
}
}

In this case, if we run the daemon with 6 workers, 3 of them will be exclusive for client ABC123 and the other 3 will consume from the rest of the non-throttled queues defined in task_queues.

Finally, we wanted to hide the routing logic from the clients of this app so they don’t have to worry about to which queue they have to send a message. We solved it by adding a single entry point, a separate worker/queue pair to handle the internal routing, so users sends their messages to this queue and forget when or where those are processed.

Final queuing strategy for the example above

Conclusion

Celery is a powerful tool with many of these hidden gems that allows complex routing logic which can make a huge impact on your app’s performance, that’s why it’s very important to design your queuing strategy considering the whole picture of all tasks running together.

--

--