Using Django 2 with Celery and SQS

Mark Gituma

Background

There are several tutorials that have been written on how to run Django, while using Celery to process asynchronous tasks with AWS SQS as the message broker. However, on implementing the recommended solutions I got unexpected results which include; ghost queues being created by celery and only been limited to run celery in the us-east-1 AWS region.

This tutorial focuses on deploying Django 2 with Celery using SQS in any AWS region that supports SQS and has an emphasis on predictability. It assumes some basic background knowledge of Django and Celery. The codebase for this tutorial can be found in my Github account.

It should be noted that the tutorial is designed to work on Ubuntu as unfortunately I wasn’t successful in getting it to run on my Mac due to a pycurl installation issue.

Environment Setup

Virtualenv

For this article, Django 2 is used which has a dependency on python3. In order to specify the correct python version, we make use of the virtualenvwrapper package where we create our virtual environment with the command:

$ mkvirtualenv --python=`which python3` <env_name>

Packages

The minimum packages required to run Django with Celery using SQS are as follows:

$ pip install boto3==1.5.2
$ pip install celery==4.1.0
$ pip install Django==2.0
$ pip install pycurl==7.43.0.1

The boto3 package provides an interface to AWS and is required by celery in order to use SQS. As SQS uses long polling as opposed to the publish/subscribe pattern utilized by RabbitMQ or Redis, it has a dependency on the pycurl package. However, the pycurl library the following dependencies:

$ sudo apt-get install libcurl4-openssl-dev libssl-dev python3-dev

Code Anatomy

<mysite>/<mysite>/settings.py

The settings required to configure celery to use SQS is as follows:

# AWS Credentials
AWS_ACCESS_KEY_ID = os.environ.get('AWS_ACCESS_KEY_ID')
AWS_SECRET_ACCESS_KEY = os.environ.get('AWS_SECRET_ACCESS_KEY')
# Celery
BROKER_URL = "sqs://%s:%s@" % (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_DEFAULT_QUEUE = '<queue_name>'
CELERY_RESULT_BACKEND = None # Disabling the results backend
BROKER_TRANSPORT_OPTIONS = {
'region': 'us-west-2',
'polling_interval': 20,
}

The BROKER_URL in the above snippet uses the AWS credentials although good for testing on the local developer machine, is not recommended for production especially on AWS servers. On production deployment to AWS, ideally the broker should be defined as:

BROKER_URL = "sqs://"

Explicitly defining theAWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY is not good practice as can lead to the AWS credentials accidentally being stored in source control. In the case of compromised keys, a manual update to the servers is required to change the credentials. This can be annoying if there are several server instances being managed. The best practice is to use AWS roles where the access and secret keys are continuously rotated and securely loaded into the environment. The boto3 package will then infer the credentials from the environmental variables.

<mysite>/<mysite>/celery.py

The celery config file is defined as the following.

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', '<mysite>')
app = Celery('django_sqs_example')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks()
app.conf.beat_schedule = {
'display_time-30-seconds': {
'task': 'demoapp.tasks.display_time',
'schedule': 10.0
},
}
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))

Usually, a namespace is passed in as an optional variable when initializing the Celery class i.e. app = Celery('<mysite>', namespace='<namespace>'). However, passing in the namespace argument for some reason causes Celery to look for the default amqp messaging broker as a result ignoring the defined SQS BROKER_URL:

[2017-12-24 04:32:21,577: ERROR/MainProcess] consumer: Cannot connect to amqp://guest:**@127.0.0.1:5672//: [Errno 111] Connection refused.

A periodic task is also scheduled which is used to show how SQS works seamlessly with Celery, using the beat command.

<mysite>/<mysite>/__init__.py

Finally the celery config is imported into the __init__.py module which makes sure celery is loaded when Django starts.

from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ['celery_app']

<mysite>/demoapp/tasks.py

The demoapp/task.py file contains a simple function to display the time and then returns. This is used by celery beat as defined in the <mysite>/<mysite>/celery.py file.

from datetime import datetime
from celery import shared_task
@shared_task
def display_time(x, y):
print("The time is %s :" % str(datetime.now()))
return True

AWS Configuration

Now that the codebase has been set up, the AWS environment can now be configured.

Least Privilege IAM Policy

AWS recommends the concept of least privilege access in order to keep applications secure. The following is an example of such a policy:

{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"sqs:DeleteMessage",
"sqs:GetQueueUrl",
"sqs:ChangeMessageVisibility",
"sqs:DeleteMessageBatch",
"sqs:SendMessageBatch",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueAttributes",
"sqs:ChangeMessageVisibilityBatch"
],
"Resource": "arn:aws:sqs:<REGION>:<ACCOUNT_ID>:<QUEUE_NAME>"
},
{
"Effect": "Allow",
"Action": "sqs:ListQueues",
"Resource": "*"
}
]
}

The policy allows messages to be created and deleted, however does not allow the creation and deletion of messaging queues. From experience, if there is a configuration problem in the Django settings.py file, such as defining the CELERY_DEFAULT_QUEUE that doesn’t exist in AWS, Django will attempt to create a queue that matches the name in us-east-1. So one of the reasons we need to have a least privilege access is to have explicit failures in case a queue doesn’t exist. This means the application needs to use an existing queue and not create one on the fly.

The policy can be specified in an AWS IAM role and the role can be attached to an EC2 instance where the relevant credentials is loaded into the instance environment. The policy can also be specified to a user, and the users access credentials containing the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY can be passed to the settings.py file.

Create the AWS SQS Queue

As the user is responsible in making sure the AWS SQS queue exists, creating one is trivial and straight forward. It should be noted however, the queue name should match the CELERY_DEFAULT_QUEUE variable in the settings.py file as well as the queue_name_prefix if set:

Testing

Celery beat

In order to test the celery beat functionality, from the root project folder, open a terminal and run the celery -A <mysite> worker -l info command. This starts the worker process and continuously polls for messages in SQS. This is where most of the failures will happen if the queue doesn’t exist or the credentials and permissions haven’t been set properly, and can be fixed transparently due to explicitly described error messages. If it works we should see something similar to:

[2017-12-27 17:30:28,570: INFO/MainProcess] Starting new HTTPS connection (1): us-west-2.queue.amazonaws.com[2017-12-27 17:30:29,113: INFO/MainProcess] Connected to sqs://<AWS_ACCESS_KEY_ID>:**@localhost//

On another terminal but also within the root folder, run the celery -A <mysite> beat -l info command which starts the periodic task. If it works, you should see an output similar to:

[2017-12-27 17:31:51,131: INFO/MainProcess] beat: Starting...[2017-12-27 17:31:51,196: INFO/MainProcess] Starting new HTTPS connection (1): us-west-2.queue.amazonaws.com[2017-12-27 17:31:51,793: INFO/MainProcess] Scheduler: Sending due task display_time-30-seconds (demoapp.tasks.display_time)

The beat task will send a message to the SQS queue which will be received by the celery worker running in the first terminal which will display a result similar to:

[2017-12-27 17:34:54,262: INFO/MainProcess] Received task: demoapp.tasks.display_time[0da44d72-59fe-43d1-906c-02ceefce9c89][2017-12-27 17:34:54,402: WARNING/ForkPoolWorker-1] The time is 2017-12-27 17:34:54.402203 :[2017-12-27 17:34:54,403: INFO/ForkPoolWorker-1] Task demoapp.tasks.display_time[0da44d72-59fe-43d1-906c-02ceefce9c89] succeeded in 0.001215843003592454s: True

Summary

We have seen a way to use Django 2 with SQS that should work in any region. However, the Django, Celery and AWS APIs are always changing, if you try to implement the proposed solution and it doesn’t work, please leave me a comment and I will update the tutorial to reflect the changes.

If you like the post and want to be notified of new blogs, follow me on twitter @MarkGituma.

Credits

Mark Gituma

Written by

Founder of Dancelogue.com, a company using AI to understand and classify human movement in dance. https://www.linkedin.com/in/markgituma/

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade