Connect Celery and RabbitMQ with Django

Usama Mehmood
DiveDeepAI
Published in
4 min readFeb 27, 2023

--

integrate celery and rabbitmq to django

Intro

While working in the Django or Flask web framework, you might have tasks that may take a long while to complete. Such tasks can hold the REST API and you might need to process another request or get data but that 1-minute long task is still running. For such problems, multi-threading comes into play. Sending the long working task in another thread and your API continues to serve you. This technique is usually important for Machine Learning tasks where you might be finetuning your model in the background and also predicting at the same time. Popular platforms also use tools like Celery and RabbitMQ to handle thousands of requests simultaneously.

What is Celery

To fix this problem in a simple and easy way, Celery comes into play. Celery being a distributed task queue allows handling vast amounts of requests in an asynchronous way. It helps schedule the tasks and runs them in a separate micro-thread. It is an open-source project and can easily be configured in your Django or Flask applications.

What is RabbitMQ

We have understood how celery fixes this problem but how will the celery communicate with the main thread to let it know about the status of the task? Did it fail? Did it succeed? To do it, a message broker comes into the scene. A message broker allows independent tasks to communicate and allows message passing. Celery can use a lot of different message brokers such as Redis, RabbitMQ, or even AWS SQS.

Combining Celery and RabbitMQ

With a few simple steps, you can have both of them running and make our application avoid significant delays. Getting quick results back is very important for user experience. Therefore, it should be added to your technology stack.

Implementation

Assuming you already have a working Django Project, let’s add Celery to the project

pip install celery

Next we need to install RabbitMQ on the computer. Before installing RabbitMQ, you will also need to install Erlang. Head over to their website and install them according to your OS.

In our settings.py file, at the bottom, add the following lines:

CELERY_BROKER_URL = ‘amqp://test:test@localhost:5672/’ 
CELERY_TASK_SERIALIZER = ‘json’

Here, we told celery the RabbitMQ URL where it can connect. Note the test:test is the username and password for the RabbitMQ service. Default is guest. You can always create new users. If you are trying for the first time, you can go with guest. Serializer will be explained later.

Next, in the same mainapp, you need to create a new file celery.py

from __future__ import absolute_import, unicode_literals
import os
os.environ.setdefault(‘FORKED_BY_MULTIPROCESSING’, ‘1’)
from celery import Celery
os.environ.setdefault(‘DJANGO_SETTINGS_MODULE’, ‘mainapp.settings’)
app = Celery(‘mainapp’)
app.config_from_object(‘django.conf:settings’, namespace=’CELERY’)
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(‘Request: {0!r}’.format(self.request))

As you can see in the above code, you just need to add the app’s name ‘mainapp.settings’ and Celery(‘mainapp’). If you have an app called ‘mywebsite’, then it would be ‘mywebsite.settings’ and Celery(‘mywebsite’).

Your Django app should have an __init__.py file (same directory as above). Add the following code to it

from .celery import app as celery_app
__all__ = [‘celery_app’]

All boilerplate configuration has been done. Now we can create tasks and send data to them.

from celery import shared_task
@shared_task
def add():
time.sleep(10)
print (“now done”)

You can create a function. Add decorator @shared_task on top of it which denotes that this will work synchronously. We created an add function, which simply waits for 10 seconds, assuming a big task is going on.

class test(APIView):
def post(self, request, format=None):
add.delay()
return Response({“data”: “task going on”})

We have created an API that accepts a POST request. To call the add method via Celery, we do add.delay(). Here inside brackets, you can also pass arguments. Those will get JSON serialized as told in the configuration. Therefore, you cannot pass class instances but rather data in JSON serializable format.

Code will send the add function to Celery to run this function in the background and a response will be sent back to the user immediately without having to wait for those 10 seconds delays inside the add function. Pretty useful for big calculations or queries that you might be performing in the background while the user calls other endpoints.

Celery Signals

Can we get to know if the task gets completed? This is where Celery Signals comes in. It has a list of functions that will be called when using celery. Here it is:

@task_postrun.connect
def task_postrun_handler(task_id, task, retval, *args, **kwargs):
print(“This message is called after the task ended”)

@task_postrun.connect decorator will run whenever any task ends. Similarly, you also get prerun, failed, etc. These can get handy to make the application more robust. The above example will get called if ANY task in the whole application will be finished. To call this for a specific shared_task function, you can provide that function as a sender. For more details, you can check out the documentation on Celery Signals.

Conclusion

Hopefully, this article may have helped in getting you started with Celery and cleared any confusion. This will come in handy as your application grows or if you are working on Machine Learning tasks having web APIs. If you have any queries, please let me know in the comments below or email them at usama.mehmood@divedeepai.com.

--

--