Setting Up a task Queue using Celery and RabbitMQ

In this post you’ll learn the absolute basics of using Celery.

Learn about;

  • Installing a message transport (broker).
  • Installing Celery and creating a task.
  • Starting the worker and calling tasks.
  • Adding new user, vhost in RabbitMQ
  • Celery Flower

Celery: Distributed Task Queue

Celery is an asynchronous task queue/job queue based on distributed message passing. It is focused on real-time operation, but supports scheduling as well.

The execution units, called tasks, are executed concurrently on a single or more worker servers using multiprocessing, Eventlet, or gevent. Tasks can execute asynchronously (in the background) or synchronously (wait until ready).

Choosing a Broker

Celery requires a solution to send and receive messages; usually this comes in the form of a separate service called a message broker. We used RabbitMQ.

Installing RabbitMQ

$ sudo apt-get install rabbitmq-server

Installing Celery

Celery is on the Python Package Index (PyPI), so it can be installed with standard Python tools like pip or easy_install:

$ pip install celery

Application

The first thing you need is a Celery instance. We call this the Celery application or just app for short. As this instance is used as the entry-point for everything you want to do in Celery, like creating tasks and managing workers, it must be possible for other modules to import it.

#tasks.py
from celery import Celery
app = Celery('tasks', backend='rpc://', broker='pyamqp://')

@app.task
def sentence_length(sentence):
return len(sentence)

Run the worker by executing our program with the worker argument

celery -A tasks worker --loglevel=info

In production you’ll want to run the worker in the background as a daemon. To do this you need to use the tools provided by your platform, or something like supervisord.

Calling the task

To call our task you can use the delay() method.

This is a handy shortcut to the apply_async() method that gives greater control of the task execution (see Calling Tasks):

>>> from tasks import sentence_length
>>> result = sentence_length.delay('sentence')

Calling a task returns an AsyncResult instance. This can be used to check the state of the task, wait for the task to finish, or get its return value (or if the task failed, to get the exception and traceback).

Results are not enabled by default. In order to do remote procedure calls or keep track of task results in a database, you will need to configure Celery to use a result backend.

We already did it.

backend='rpc://'

Getting the result

The ready() method returns whether the task has finished processing or not:

>>> result.ready()
False
>>> result.get()
8

In case the task raised an exception, get() will re-raise the exception, but you can override this by specifying the propagate argument:

>>> result.get(propagate=False)

“guest” user can only connect via localhost

By default, the guest user is prohibited from connecting to the broker remotely; it can only connect over a loopback interface (i.e. localhost). Any other users you create will not (by default) be restricted in this way.

The recommended way to address this in production systems is to create a new user or set of users with the permissions to access the necessary virtual hosts.

Creating a new None Administrative User

$ sudo rabbitmqctl add_user krishna krishnadey

This command instructs the RabbitMQ broker to create a (non-administrative) user named “krishna” with (initial) password “krishnadey”.

Creating a new virtual host

$ sudo rabbitmqctl add_vhost sample_host

This command instructs the RabbitMQ broker to create a new virtual host called “sample_host”.

Making new user administrator

$ sudo rabbitmqctl set_user_tags krishna administrator

This command instructs the RabbitMQ broker to ensure the user named “krishna” is an administrator.

Seting user permissions

$ sudo rabbitmqctl set_permissions -p sample_host krishna ".*" ".*" ".*"

This command instructs the RabbitMQ broker to grant the user named “krishna” access to the virtual host called “sample_host”, with configure ,write and read permissions on all resources.

Updating the new user and virtual host

app = Celery('tasks', backend='rpc://', broker='pyamqp://krishna:krishnadey@localhost:5672/sample_host')

Celery Flower

Flower is a web based tool for monitoring and administrating Celery clusters.

Installing Flower

$ pip install flower

To make sure broker details and graph is visible

$ sudo rabbitmq-plugins enable rabbitmq_management 
$ sudo service rabbitmq-server restart 
$ sudo rabbitmqctl set_user_tags krishna administrator

Restart the Systemd unit file for flower

$ sudo systemctl restart flowerd

Running Flower

$ celery -A tasks flower --port=5555 --address=0.0.0.0 --basic_auth=admin:admin_pass --broker=amqp://krishna:krishnadey@localhost:5672/sample_host