A dockerized pub-sub message queue implementation in Django by using RabbitMQ and Celery

Cagri Aslanbas
Nov 21, 2019 · 7 min read
Photo by Tim Evans on Unsplash

Dealing with RabbitMQ message queues in Django might become a nightmare if you want to publish/subscribe messages asynchronously. Celery might be the first thing comes to mind when talking about non-blocking tasks in Python, but there starts another nightmare when you want to publish/subscribe messages via a customized message queue (custom exchange name/type, custom queue name/type etc.). I’m going to show step-by-step how to publish/subscribe to a custom RabbitMQ message queue by using Celery’s default producer pool.

Basic knowledge of Python and Django views/urls are required to fully understand this post.


Let’s start

But first of all; what is publish-subscribe pattern? Wikipedia¹ provides an easy and clear definition for publish-subscribe pattern:

In software architecture, publish–subscribe is a messaging pattern where senders of messages, called publishers, do not program the messages to be sent directly to specific receivers, called subscribers, but instead categorize published messages into classes without knowledge of which subscribers, if any, there may be. Similarly, subscribers express interest in one or more classes and only receive messages that are of interest, without knowledge of which publishers, if any, there are.


What is RabbitMQ and Celery?

RabbitMQ is a well-known message broker, which forwards published messages and stores them until the client acknowledges that it consumed (received) the message or the time to live (TTL) of the message is past. RabbitMQ forwards and stores the messages in message queues. There can be multiple separate message queues and each client can subscribe to one or more of them.

Publishers do not directly send messages to queues, but to exchanges, which will then redirect each message to corresponding message queue. There are multiple types of exchange. The one we will be using is direct exchange. The logic behind the direct exchanges is that you provide a routing key with the message during publishing. Then the exchange forwards the message to the queue having the exact routing key.

Celery is a widely-used Python library for asynchronous task/job queue handling. It has its own separate worker process that handles this. Besides, by using external Python library django-celery-results, each fired Celery task can be logged in database. Celery is commonly used with RabbitMQ or Redis as its task/job queue. We will be using RabbitMQ as the message queue today.


Our flow

For the sake of complexity, we will not create separate publisher and subscriber. Instead, our Django app will both publish a message and subscribe to our RabbitMQ message queue. Message publish will be handled by a Celery task, which will be invoked by a simple HTTP/GET request. Our flow diagram will look like:

Our flow diagram for today

Starting a Django project

Okay, everything is good up to now. It’s time to create a django project and a django app. First, we need to install django:

> pip install django
> django-admin startproject mypubsub
> cd mypubsub

Okay, lets make sure our django app starts (as we don’t have a db yet, you can ignore the db migration warnings):

> python manage.py runserver 8080

If it runs correctly, then create a new app api:

> python manage.py startapp api

Setting Docker and docker-compose:

Perfect, now its time to dockerize our app. Before doing that, we need to create our requirements.txt, which will include the pip libraries that we need to install in our Docker container. The requirements.txt will be:

celery==4.3.0
Django==2.2.7
django-celery-results==1.1.2
kombu==4.6.6
psycopg2==2.8.4

Kombu is a messaging library that is maintained by celery. We will be using Kombu for publishing and subscribing to message queues. The other libraries’ names explain itself.

Docker and docker-compose are needed to be installed, you can follow these links for installation:

If you are good to go, lets start with our first file, Dockerfile.

  • Dockerfile
FROM python:3.6.7-alpineENV PYTHONUNBUFFERED 1# Copy requirements from local to container
COPY ./requirements.txt /requirements.txt
# Install python and postgres dependencies under a virtual package
RUN apk add — update — no-cache postgresql-client
RUN apk add — update — no-cache — virtual .tmp-build-deps \
gcc libc-dev linux-headers postgresql-dev musl-dev
RUN pip install — upgrade pip -r /requirements.txt
# Delete virtual packages as we installed our dependencies
RUN apk del .tmp-build-deps
# Copy and set our project folder from local to container
RUN mkdir /app
WORKDIR /app
COPY ./ /app
# Copy the wait-for script from local to container
COPY ./wait-for /bin/wait-for
RUN chmod 777 -R /bin/wait-for
RUN adduser -D user
USER user

Briefly speaking; this Dockerfile creates a Python environment, installs every pip package in the requirements.txt and other dependent linux libraries. Then it copies our whole django mypubsub project source code in the container. The wait-for script will be used to wait for other docker-compose services that are not ready to go.

  • docker-compose.yml

Our Docker-compose file will consist of 4 docker services: django app, celery worker, postgres database and rabbitmq. In order not to collide with possible other instances that you might be running on your local, we will assign their exposed ports such as:

  • Django: 8080:8080
  • Postgres: 54320:5432
  • RabbitMQ: 5682:5672
  • RabbitMQ-UI: 15682:15672
version: "3"services:
backend:
build:
context: .
image: backend-image
container_name: backend
hostname: backend
restart: on-failure
ports:
- "8080:8080"
volumes:
- .:/app
command: >
sh -c "wait-for db:5432 && wait-for rabbit:5672 &&
python manage.py migrate &&
python manage.py runserver 0.0.0.0:8080"
stdin_open: true
tty: true
depends_on:
- db
- rabbit
networks:
- shared_network
rabbit:
image: rabbitmq:3-management
container_name: rabbit
hostname: rabbit
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=admin
- RABBITMQ_DEFAULT_VHOST=/
ports:
- "5682:5672"
- "15682:15672"
networks:
- shared_network
worker:
image: backend-image
restart: on-failure
command: >
sh -c "wait-for backend:8080 &&
celery worker -B -l info -A mypubsub.celery -s /tmp/celerybeat-schedule"
depends_on:
- backend
- rabbit
- db
networks:
- shared_network
db:
image: postgres:10-alpine
container_name: db
hostname: db
environment:
- POSTGRES_DB=app
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
ports:
- "54320:5432"
networks:
- shared_network
networks:
shared_network:
driver: bridge

All of these services are connected to each other via shared_network, db and rabbit services will start first. Then, backend will start. Finally, worker will start. The reason we use wait-for script in addition to depends_on feature is to make sure our backend and worker services will start when our db and rabbit services are fully operational.

Final steps

Before we are ready to go, we need to add the celery logic into our django project. We will start by adding celery-related variables.

  • settings.py
INSTALLED_APPS = [
...,
'django_celery_results',
...,
]
# ...# Celery properties
CELERY_BROKER_URL = 'amqp://admin:admin@rabbit:5672//'
CELERY_RESULT_BACKEND = 'django-db'

We add django_celery_results to INSTALLED_APPS, which will log each of our celery task into the database. Then we set the RabbitMQ broker url and let celery use the database for task logging.

  • mypubsub/__init__.py
from __future__ import absolute_import, unicode_literals# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)
  • mypubsub/celery.py

One of the trickiest part of this post is this file. First, we will start with default celery settings:

from __future__ import absolute_import, unicode_literals
import os
import kombu
from celery import Celery, bootsteps
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'mypubsub.settings')
app = Celery('mypubsub')
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()

Now we will do 2 things:

  • Modify celery’s default producer_pool by declaring our rabbitmq exchange and queue. We will use a producer from this pool to publish messages to our message queue later. As a side note; declaring exchanges and queues creates if they don’t exist, or loads them if they exist. As we did not manually create none of these, our first call to these declarations will create our exchange and queue.
# setting publisher
with app.pool.acquire(block=True) as conn:
exchange = kombu.Exchange(
name='myexchange',
type='direct',
durable=True,
channel=conn,
)
exchange.declare()
queue = kombu.Queue(
name='myqueue',
exchange=exchange,
routing_key='mykey',
channel=conn,
message_ttl=600,
queue_arguments={
'x-queue-type': 'classic'
},
durable=True
)
queue.declare()
  • Create a custom message consumer for subscription, which will acknowledge the message right away, and register it to our celery app.
# setting consumer class
class MyConsumerStep(bootsteps.ConsumerStep):
def get_consumers(self, channel):
return [kombu.Consumer(channel,
queues=[queue],
callbacks=[self.handle_message],
accept=['json'])]
def handle_message(self, body, message):
print('Received message: {0!r}'.format(body))
message.ack()
# Register the custom consumer
app.steps['consumer'].add(MyConsumerStep)
  • api/tasks.py

In order to access the celery’s producer_pool and publish a message to our message queue, we need to define a celery task to be fired.

from __future__ import absolute_import, unicode_literalsfrom celery import shared_task
from mypubsub.celery import app
@shared_task
def publish_message(message):
with app.producer_pool.acquire(block=True) as producer:
producer.publish(
message,
exchange='myexchange',
routing_key='mykey',
)

As can be seen, this simple celery task acquires a producer from the pool and publishes a message to our exchange with our routing_key.

  • api/views.py

We will create a very basic django functional view, which will fire our celery task.

from django.http import HttpResponsefrom api import tasksdef my_pub_view(request):
tasks.publish_message({'hello': 'world'})
return HttpResponse(status=201)
  • api/urls.py
from django.urls import path
from api import views
app_name = 'api'urlpatterns = [path('publish', views.my_pub_view, name='publish'),]

That’s all! Lets try it.

Running our project

We run our project by using these docker-compose commands:

> docker-compose build
> docker-compose up

If you see that your database, rabbitmq broker, django app and celery worker are all up successfully, then open your browser and go to http://localhost:8080/api/publish which should show something like this log on your terminal:

worker_1   | [...] Received message: {'hello': 'world'}

I hope that worked like a charm!

Conclusion

Celery is one of the most common and useful libraries in Python, which saves a lot of coder’s day when it comes to asynchronous tasks. Therefore, getting use of Celery’s mechanism to be able to publish and subscribe to custom RabbitMQ message queues is a good idea and can make your messaging logic more robust.

I couldn’t find any thorough and complete source showing an implementation of the pub/sub pattern by using Django, postgres, RabbitMQ, Celery and Docker, so I decided to write this article. Hope it helps.

Full code-source can be found from my GitHub repository.


About me

My name is Cagri Aslanbas, I’m a senior software developer working at Arcelik Innovations Directorate, located in Istanbul/Turkey.

I would also like to thank for my colleague Cagatay Barin for his valuable contributions on this post.


[1]: Wikipedia. Publish-subscribe pattern https://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade