Django, Scheduled Tasks & Queues (Part 2)

You can find part 1 here.

This will only scratch the surface of Celery, there are lots of options and configuration that can be done to make it more custom for your needs, etc.

Go with the flow

lets breakdown how our application flow currently works and how we need to change it, lets say we have a bid form submission we want to trigger an email to send after submission, if we put that into a flow chart we have the following:

Green items are what we currently have, we have lots of read and writes to the database, using a system like celery allows us to negate those extra reads and writes and off load that work to our broker and celery workers, which results in a much cleaner, leaner application flow.

We have a couple of steps to get this working, we will need to do the following steps:

  • Set up a broker
  • Install Celery
  • Configure our tasks
  • Replace our existing code
  • Test

Lets get started.

Broker

A broker in this context is to “ mediate between clients and workers”, celery does not come with a broker by default, however it does come with support for:

  • RabbitMQ
  • Redis
  • Amazon SQS
  • Zookeeper

We will be using RabbitMQ for our example, so lets get to it:

I will be assuming we are using a linux based system to host and run our broker and celery.

To install RabbitMQ we need to open a terminal with sudo rights, and enter the following command:

sudo apt install rabbitmq-server

This will go through and install RabbitMQ and all the required dependencies. It will also start the service, as this point we actually have RabbitMQ acting as a broker, however we will add some extra security to our instance.

Lets start by removing the guest user by entering

sudo rabbitmqctl delete_user guest

We need to create a user now and assign the correct permissions, lets add our user by entering

sudo rabbitmqctl add_user username password
sudo rabbitmqctl set_user_tags username administrator
sudo rabbitmqctl set_permissions -p / username ".*" ".*" ".*"

For a full breakdown please review the documents here. Below you can see I have created a user called crunchy with the password waffles.

We now have a functioning, secured RabbitMQ server, we can test it by using the following cURL command to access the management interface api.

sudo rabbitmq-plugins enable rabbitmq_management
curl -i -u username:password http://localhost:15672/api/whoami

So now we have a RabbitMQ server setup, ready to accept our task queue.

You can now access the management console by visiting your IP:15672 this allows you to see connections, queues and other management related information and menus.

Celery

We have part 1 setup, we now need to install and configure Celery to start building tasks.

One main advantage for us as Python / Django developers is that Celery is built on Python and as such we simply need to make sure our VirtualEnv is active and enter

source /path/to/venv/bin/activate
pip install celery

We have a few steps we need to put together before we can start building out tasks, we need to configure celery in our settings.py with our broker URL, in settings.py we add the following:

CELERY_BROKER_URL = 'amqp://crunchy:waffles@192.168.0.38//'

This property is used by our Celery instance, in the the same package as our settings.py we need to create a file called celery.py

Lets breakdown what this is doing

  • Line 5, we set the our application settings module for our Celery instance
  • Line 7, we create an instance of Celery and pass our project name into it, we bind this instance to the variable app
  • Line 9, we pass a config from an object made of up values in our settings.py begging with the prefix of CELERY_
  • Line 11, we autodiscover tasks, we just need to make a tasks.py file in each of our applications

So with that we now have Celery, installed and configured so lets build a task to test. In our crunchy_waffles_app i made a file called tasks.py and here i have built out a simple task called add:

We have imported shared_task from celery this allows us to create a task without and instance of celery, so we can reuse these tasks elsewhere.

The function we will test is a simple addition, it takes 2 numbers and returns the sum.

Lets get our celery instance started up, we do so by typing

celery -A project_name worker -l info
If you are on windows to bypass a bug i would suggest you use the following as a workaround, for testing
celery -A project_name worker --pool=solo -l info

This will output lots of details to the console, we will quickly look at an important part:

As part of the celery boot process it will load in all tasks as we declared in the celery.py file.

So we have the instance its connected to our RabbitMQ instance, we can now test our add function, from a terminal with our VirtualEnv active open a Python console and type:

from crunchy_waffles_app.tasks import add
add.delay(1,3)

When we press enter on our add function we should be expecting the answer 4, however we have just called this function asynchronously and thats not the result we get back.

We get back an instance of the AsyncResult class, to get the value 4 we need to unwrap the value. This is simply the following amendment:

result = add.delay(1,3)
result.result

We bind our function to a variable in this case result, we then access the result property of the AsyncResult class.

For a full list of available properties, methods for the AysncResult Class see the celery documentation here.

Replacing Emails

Its at this point now where we need to begin replacing our existing code for email alerts with a task.

Lets go back in to tasks.py and make a new task, the task will do the following:

  • Take a bid id as a parameter
  • Process the bid with email sent
  • Send the email using the Mailgun API
For this example i will be using Mailgun as my email service provider. You can find out about them here. (We will not be running through setting this up)

Lets see the code, files and then we can run through it:

Firstly lets look at the file crunchy_emails.py this is a wrapper class so i can easily send emails using the Mailgun API rather than using the SMTP method. With a few tweaks this would make a nice handler for any email application wide.

The signals file we edited we removed the block we had in previously and with one line of code we now hand emails off to celery and rabbitMQ.

Our task takes an id of a bid object, celery tasks will not take models or other non seralizable objects as parameters. We get the object try and catch any errors in EmailService and check if the response is not a string, if it is then raise and exception, else update the bid object.

Lets spin up celery and put a bid on an object.

Side note if you want a nice mail app check out Astro. As you can see its fired an email across to my target account.If we check the bid object in admin we can see its been updated as expected.

So thats all good but what about storing the task details and data, such as task failures, issues, etc. We could write our own package to do this, however Celery already has one called Django Celery Results, lets use this and get it setup.

pip install django-celery-results

Lets add it to our installed apps:

INSTALLED_APPS = (
...,
'django_celery_results',
)

Next we need to migrate as it comes with some tables it needs to use:

python manage.py migrate django_celery_results

When completed we need to make one more addition to our settings.py

CELERY_RESULT_BACKEND = 'django-db'

We are configuring CELERY to use a backend to store the results, you can choose from a few different backends depending on your needs and requirements, as we are using Django we will focus on this.

After this start up our celery worker again, and lets run the task a few times and then go to the admin panel. You will see a new application in the admin panel called DJANGO_CELERY_RESULTS, if we go into that we can view the tasks, if we pick a task we can see the details of that task and the outcome:

At this point we have replaced our scheduled cron job with a realtime task management system, our customers receive their emails immediately and our CTO is happy with the new structure as it means they can monitor in real-time and scale the load over a few nodes.

Our CTO is so impressed with celery that he wants us to replace our cron jobs with celery, in part 3 we run through scheduling tasks through celery and what is available to get this done.

One clap, two clap, three clap, forty?

By clapping more or less, you can signal to us which stories really stand out.