Retrying Asynchronous Tasks With Celery


Writing resilient code that can handle task failure is important for maintaining modern functional systems. We’ll be going over how to retry asynchronous tasks with celery in python, commonly used in django applications:

Say we are writing code to query an external service and want to ensure that our code responds appropriately to request throttling, or any other potential failure.

We could set up a simple task like so:

from proj.celery import app
def celery_task(*args, **kwargs):
service_result = query_external_service(*args, **kwargs)
return service_result

In order to handle task failure we might identify or create an exeption that will throw when the task fails. We will handle the exception by placing the task back on the queue to be retried by a celery worker. The described case may look something like this:

from proj.celery import app
from proj.exceptions import Exception

@app.task(bind=True)
def celery_task(self, **args, **kwargs):
try:
service_result = query_external_service(*args, **kwargs)
except Exception:
self.retry()

It’s possible that we may want to customize the way that the task is retried. For example, we may know that request throttling on the external service we are calling refreshes requests for our token every 30 seconds; in this case, we will not want to retry the our task for 30 seconds. Note: It is important that we retry tasks as opposed to placing time delays inside of our exception handles so we do not tie up the threads running our tasks and prevent them from completing other tasks on the queue.

Conveniently, we can pass a countdown argument to the retry method to allow for delay before retrying the task:

from proj.celery import app
from proj.exceptions import Exception
@app.task(bind=True)
def handle_celery_task(self, **args, **kwargs):
try:
service_result = query_external_service(*args, **kwargs)
except Exception:
self.retry(countdown=30)

It is also possible that we may want to get a bit more strategic in the way that we go about retry delays. Celery provides access to a persistent variable attribute that tracks the number of times that a task has been retried. We can use it to set up a retry strategy, such as an exponential delay:

from proj.celery import app
from proj.exceptions import Exception
@app.task(bind=True)
def handle_celery_task(self, **args, **kwargs):
try:
service_result = query_external_service(*args, **kwargs)
except Exception:
self.retry(countdown= 2 ** self.request.retries)

We will want to ensure that our tasks do not allow endless retries as we will want them to fail if they have not been successful after a certain number of attempts. It is important that we handle this case as it is possible that there is something preventing the tasks from being completed and we will not want to continue placing them back on the queue if this occurs. For this case, celery provides an argument that can be passed into the method decorator:

from proj.celery import app
from proj.exceptions import Exception
@app.task(bind=True, max_retries=10)
def celery_task(self, **args, **kwargs):
try:
service_result = query_external_service(*args, **kwargs)
except Exception:
self.retry(countdown= 2 ** self.request.retries)

This will allow for base 2 exponential delays with a maximum of 10 retry attempts. Tasks will no longer be placed back on the queue after 10 failed attempts and the issue causing failure can be reviewed.

Maintaining state with celery retries can create another challenge. Say for instance pagination of a resource is required and a request made in a task fails before completion after a number of successful connections. In this case, we will pass arguments into our retry method that will allow for the task to be retried with specified arguments.

from proj.celery import app
from proj.exceptions import Exception
@app.task(bind=True, max_retries=10)
def celery_task(self, **args, **kwargs):
try:
service_result = query_external_service(*args, **kwargs)
except Exception:
self.retry(args=[*args, **kwargs], countdown= 2 ** self.request.retries)

This will allow task arguments to be modified so retrying the task on failure does not cause loss of state.

For a complete overview of asynchronous programming in python with Django check out my book:

https://www.amazon.com/Asynchronous-Django-Josh-Dwernychuk-ebook/dp/B074SG7WT6/

Happy Asynchronous Programming!