Testing Celery Tasks

Recently I came across the problem of writing unit tests for Celery tasks. Writing unit tests for celery tasks can be painful since they are asynchronous and long. I will be explaining how to write unit tests for Celery tasks and will try to address different cases for this. For this tutorial we will be using python’s native unittest library.

Prerequisites

  • You should know the basics of Celery to follow the tutorial.
  • If you know basics of writing tests then its awesome but not mandatory for this tutorial.

Environment Setup

  • This tutorial is written for python 2.x and should work or python 3.x also.
  • You must have working installation of Celery, RabbitMQ and Redis (redis because we want to store the results)

That was all let’s dive into the tutorial

We will be creating a very simple celery task which adds two numbers

Open your favourite text editor and paste the following code and save it as tasks.py

from celery import Celery
from time import sleep
BROKER_URL = ‘amqp://guest@localhost:5672//’
REDIS = ‘redis://localhost/0’
app = Celery(‘test’, broker=BROKER_URL, backend=REDIS)
@app.task(name=”add”)
def add(x, y):
return x+y

Now let’s write some tests for this tasks.

We have to test whether our task returns the correct value for given two numbers or not. So we will write one test case class to test all the possible tests for this tasks.

Now create a file named tests.py in the same directory as tasks.py and paste the following code.

from tasks import add, app
import unittest
class TestAddTask(unittest.TestCase):
def setUp(self):
self.task = add.apply_async(args=[3, 5])
self.results = self.task.get()

Let’s see what we did back up there. We inherited the unittest.TestCase class and defined a setUp method. This method is used to initialize the tests and set necessary variables for testing. Here we called our add task and with arguments 3, 5 and we called get method on it to get the result. This get method call wait till the task finishes. When the tasks finishes we got the results and test initialization is completed. Now let’s test the output of this task.

Let’s check our task’s state whether it returned SUCCESS or FAILURE

In the above mentioned TestAddTask class we now define a method to check state of our task

def test_task_state(self):
self.assertEqual(self.task.state, ‘SUCCESS’)

In the above mentioned TestAddTask class now we define a method to test our output.

def test_addition(self):
self.assertEqual(self.results, 8)

Complete test case class

from tasks import add, app
import unittest
class TestAddTask(unittest.TestCase):
def setUp(self):
self.task = add.apply_async(args=[3, 5])
self.results = self.task.get()
  def test_task_state(self):
self.assertEqual(self.task.state, ‘SUCCESS’)
  def test_addition(self):
self.assertEqual(self.results, 8)

Now let’s run the tests.

Start celery worker: Inside the directory where tasks.py is run the following command.

celery -A tasks worker — loglevel=INFO

Run the tests: Inside the directory where tests.py is run the following command

python -m unittest discover

You should see an output something like below

➜ python -m unittest discover
..
— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —
Ran 2 tests in 5.353s
OK

Voila we successfully ran the tests on celery tasks.

What if my tasks are on remote machine and I can’t import them?

Don’t worry just replace apply_async function with the following statement

self.task = app.send_task(‘add’, args=[3, 5])

Now let’s take an example where the task does not return any results

Let’s write a task which downloads an image and writes it to a file. Update tasks.py with following task

import requests
@app
.task(name=’download-image’, ignore_results=True)
def download_image(url):
r = requests.get(url)
with open(‘image.jpg’, ‘wb’) as f:
f.write(r.content)

Let’s see how should we test this task. Write the following code in the tests.py

import os
class TestDownloadTask(unittest.TestCase):
def setUp(self):
self.task = app.send_task(‘download-image’, args= [‘https://www.math.ust.hk/~masyleung/Teaching/CAS/MATLAB/image/images/cameraman.jpg'])
self.results = self.task.get()
  def test_task_state(self):
self.assertEqual(self.task.state, ‘SUCCESS’)
  def test_download(self):
self.assertEqual(os.path.exists(‘image.jpg’), True)

Here we check that our task has wrote the image successfully or not by using os library.

Check out more about Testing Celery Tasks at official docs. Please comment if you find something wrong.