Simple Async Django App with Celery and RabbitMQ

Mahdi Ghelichi
6 min readJan 31, 2019

--

We will be building a simple Django application to run async tasks in the background using Celery and RabbitMQ. Celery is a powerful asynchronous task queue/job queue based on distributed message passing. RabbitMQ is an open-source message-broker. A software where queues can be defined, applications may connect to the queue and transfer a message onto it. Monitoring/canceling/deleting of the background jobs are other capabilities of this basic app.

Prerequisites and installations:

This tutorial uses Python3.6, Django 2.1, Celery 4.2.1.
To install Django and Celery:

pip install django==2.1 celery==4.2.1

(if you are using multiple versions of python and want these libraries to be installed only on a specific version of python, lets say, 3.6, then consider this command):

python3.6 -m pip install django==2.1 celery==4.2.1

To install RabbitMQ (Mac): brew install rabbitmq
The RabbitMQ codes are installed in /usr/local/sbin. You need to add this path to your .bash_profile . nano ~/.bash_profile and add it to the bottom of the .bash_profile, finally, export PATH=$PATH:/usr/local/sbin.

Part 1. Basic setup

  1. First, cd to the directory where we want to store our code, then run django-admin startproject AsyncDjangoApp
    This will create a AsyncDjangoApp directory in our current directory.
  2. Then: cd AsyncDjangoApp.
  3. Now, to create our app, we run: python3.6 manage.py startapp App
  4. Inside AsyncDjangoApp package (containing __init__.py) create an empty celery.py file.
  5. Inside the App package, create empty forms.py and tasks.py. The task.py is where we put the functions/codes that need to be put in the background. These are usually slow or time-consuming tasks such as training machine learning models. The forms.py contain the Django Forms that will be receiving input information from the user.
  6. We create a template directory that will contain our simple html pages.
  7. Run rabbitmq-server start .
  8. Our entire directory should now look like this:

Part 2. Django setup

First, we need to setup our settings.py. We need to specify the templates configuration and add our App to the installed app list:

import osBASE_DIR = os.path.dirname(...)
TEMPLATES_DIR = os.path.join(BASE_DIR, ‘templates’)
...
INSTALLED_APPS = [
...
'App',
]
...
TEMPLATES = [
{
...
'DIRS': [TEMPLATES_DIR, ],
...
}
]
...

Second, we need to specify the configurations for celery in settings.py according to this at the bottom of our settings file:

Part 3. Filling the blanks

Now, it is the time to fill out our celery.py and forms.py plus design our models and url patterns.

celery.py:

from __future__ import absolute_import
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'AsyncDjangoApp.settings')

app = Celery('AsyncDjangoApp')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()

Now run: celery -A AsyncDjangoApp worker -l info to launch Celery workers (being inside AsyncDjangoAppdirectory).

forms.py:

from django import forms

class JobForm(forms.Form):
job_name = forms.CharField(label='Job name', required=True)

As we can see, our forms only contain one entry which is the name of the job that we are giving to it.

To get track of our jobs, we need to store them in our database upon their call/creation. A Django model is a class that represents table or collection in our DB, and where every attribute of the class is a field of the table or collection. We only need to store the task_id that Celery gives to each task and the job_name that user assigns.

models.py:

from django.db import models

class Tasks(models.Model):
task_id = models.CharField(max_length=50)
job_name = models.CharField(max_length=50)
def __str__(self):
return f'{self.task_id} {self.job_name}'

AsyncDjangoApp/urls.py:

from django.contrib import admin
from django.conf.urls import include, url

urlpatterns = [
url(r'admin/', admin.site.urls),
url(r'^App/', include('App.urls')), # our application !
]

App/urls.py:

from django.conf.urls import url
from App import views

app_name = 'App'

urlpatterns = [
url(r'^run/$', views.run, name='run'),
url(r'^monitor/$', views.monitor, name='monitor'),
url(r'^delete_job/(?P<task_id>.+)/$', views.delete_job,
name='delete_job'),
url(r'^cancel_job/(?P<task_id>.+)/$', views.cancel_job,
name='cancel_job')
]

delete_job and cancel_job are the endpoints for removing a job from our DB and revoking/canceling a celery task. The first two endpoints are the major endpoints we will deal with today.

Part 4. Views and tasks

In our backend, when /App/run/ is called, views.run will be called. Upon a GET call, this view returns an html page displaying JobForm and asking for input information (in this case only a job_name). Submitting this form, results in a POST call, triggering our Celery task, and landing back on the same page while the task is running in the background .

views.py

from App.tasks import process
from App.forms import JobForm
from django.shortcuts import render
from django.views.decorators.http import require_http_methods

@require_http_methods(["GET", "POST"])
def run(request):
if request.method == "POST":
form = JobForm(request.POST)
if form.is_valid():
data = form.cleaned_data
job_name = data['job_name']
process.delay(job_name=job_name)
return render(request, 'job.html',
context={'form': JobForm,
'message': f'{job_name} dispatched...'})
else:
return render(request, 'job.html',
context={'form': JobForm})

Our background task/function is called process. To run this celery function in the background we just need to call it with process.delay(*args, **kwargs) instead of regular python process(*args, **kwargs). In this case, our process function only needs the job_name as its input keyword argument.

tasks.py

from AsyncDjangoApp.celery import app
from App.models import Tasks
from time import sleep
import random

@app.task(bind=True)
def process(self, job_name=None):

b = Tasks(task_id=self.request.id, job_name=job_name)
b.save()

self.update_state(state='Dispatching', meta={'progress': '33'})
sleep(random.randint(5, 10))

self.update_state(state='Running', meta={'progress': '66'})
sleep(random.randint(5, 10))
self.update_state(state='Finishing', meta={'progress': '100'})
sleep(random.randint(5, 10))

Here, b = Tasks(…) creates and then saves the invoked celery job, which we retrieve later on for tracking and monitoring purposes.
We use a dummy function and random sleeps to simulate a long-running job. Each of the steps can simulate multiple steps of a slow process. For instance, in data science, one can divide the entire process of training a model into three steps:
1. Pre-processing the dataset.
2. Training the algorithm.
3.
Saving the model and reporting metrics.

We are also customizing the Celery’s task state (Dispatching, Running, Finishing) and providing additional information such as the level of job progress.

Part 5. Adding HTML templates

Our app has only two pages, a page that asks for a job_name (using JobForm) and runs that job in the background. We call this page run.html. Here we use Bootstrap 4 for basic styling (using its CDN in the <head> tag of our html).

For the details of Django templates, I refer you to its official documentation. Line 18, 19 of the above gist provide a direct link to our job tracking page.
Now, we need to run our app using (before that applying migrations):
python3.6 manage.py migrate
python3.6 manage.py makemigrations
python3.6 manage.py runserver
This is how our run.html looks like after typing in Job name and hitting Run:

Part 6. Job management/monitoring

For monitoring purpose, we make a query to grab all stored Tasks, then call Celery’s AsyncResult(task_id) and return a list containing 4 elements: job_name, state, progress, and task_id. The monitor.html displays the level of job progress using basic Bootstrap progress bars.

views.py

from celery.result import AsyncResult
from App.models import Tasks
from django.shortcuts import render
from django.views.decorators.http import require_GET
def track_jobs():
entries = Tasks.objects.all()
information = []
for item in entries:
progress = 100 # max value for bootstrap progress
# bar, when the job is finished
result = AsyncResult(item.task_id)
if isinstance(result.info, dict):
progress = result.info['progress']
information.append([item.job_name, result.state,
progress, item.task_id])
return information

@require_GET
def monitor(request):
info = track_jobs()
return render(request, 'monitor.html', context={'info': info})
@require_GET
def cancel_job(request, task_id=None):
result = AsyncResult(task_id)
result.revoke(terminate=True)
info = track_jobs()
return render(request, 'monitor.html', context={'info': info})


@require_GET
def delete_job(request, task_id=None):
a = Tasks.objects.filter(task_id=task_id)
a.delete()
info = track_jobs()
return render(request, 'monitor.html', context={'info': info})

Canceling a job means getting the AsyncResult(task_id) object and then calling its revoke() method. Deleting a job simply translates to removing a job from DB. The following shows an snapshot of the monitoring page. The complete monitor.html template can be found here.

The complete code can be found here.

--

--