Celery + Django best practices and start

Mika, from Palmy Investing
5 min readApr 12, 2023

--

Hello everyone, this will be a quick overview of good practices for using celery with django. I will use celery v.5.27 for this small post.

First of all you may ask, if you are new to celery: Where to write my code/tasks? There are several options to structure a celery project, but I do it this way:

/ core
/ __init__.py
/ tasks.py
/ settings.py
/ celery.py

/app_one
/ tasks.py

/app_two
/ tasks.py

Here, core is your root app with all the setup (You can find it by searching for settings.py). I‘ve added two new files there:

  • celery.py -> This is for some configuration purpose
  • tasks.py -> A tasks.py indicates a celery file where all your tasks are defined which relate to the current app (see app_one, app_two)

core/__init__.py

from .celery import app as celery_app
default_app_config = 'app'
__all__ = ("celery_app")

core/celery.py

Here you will store all the configuration stuff. If you want to use celery beat, which we won‘t cover in this post, thats the future place to code the beat config.

from celery import Celery

app = Celery(
'app‘
)
app.config_from_object("core.settings", namespace="CELERY")

app.autodiscover_tasks(
["app_one", "app_two"]
)

About this config:

  • app creates an instance of the Celery base class
  • config_from_object tells celery where to look for the configuration. It will search in core.settings, for all variables which start with CELERY (We go into that in a second)
  • autodiscover_tasks tells celery where it has to look for your tasks.py files (in our example at app_one and app_two)

We can now add some config to our settings.py, but only CELERY_ variables will be valid:

# core/settings.py

CELERY_timezone = "UTC"
CELERY_broker_url = "redis://"
CELERY_result_backend = "redis://"
CELERY_task_serializer = "json"
CELERY_result_expires = 600
CELERY_accept_content = (‘application/json’, ‘application/x-yaml’)

CELERY_max_memory_per_child=None
CELERY_worker_cancel_long_running_tasks_on_conncetion_loss = True

Please note that:

  • Each setting has to be individually configured for your app. There is no builtin setup, only defaults which won‘t fit for everyone
  • Change broer_url to your broker if you decide to use f.ex. rabbitmq
  • Change the accepted content to ‘application/json‘ if you dont need yaml or another content for serializing and deserializing
  • Change max_memory_per_child if you want to make sure that a child wont go beyond n bytes (f.ex. 12_000_000 = 1.2 GB)

Next step: tasks.py

Now open on of your tasks.py files. Lets create a simple task

from celery import shared_task, group, chord, chain

@shared_task()
def add(a: int, a2: int) -> None or int:
"""Add two integers and return the result"""
return a + a2 if isinstance(a, int) and isinstance(a2, int) else None

This task is equal to the following code, except it can work asynchronously with the celery worker.

def add(a: int, a2: int) -> None or int:
"""Add two integers and return the result"""
return a + a2 if isinstance(a, int) and isinstance(a2, int) else None

So as you see, the main difference is the @shared_task() decorator. BUT this will change! When your tasks get more complex you will use concept of celery inside the task itself and its easy for you to determine that its a celery task, even without seeing the decorator.

Lets call the task, but where?

# app_1/views.py

from app_1.tasks import add

def quick_math(request, a, a2):
"""A django view for illustrating how to call tasksa"""

# .s() is the signature call
# this wont start anything, because its not calling the task. Its a
# simple defining of task + the arguments

not_evaluated = add.s(a, a2)

# this is how you’d evaluate it
evaluated = not_evaluated.delay()


# Use apply_async( (*my_tasks_arguments), *task_args, **task_kwargs )
# task_args& task_kwargs will be the same as defining them inside the tasks
# decorator. Therefore you can define each task individually when calling it

# F.ex. task is not the same as task_task
# task_task will ignore the result and you cant see it anymore, while
# task is able to .get() the result of the addition

task = add.apply_async((a, a2))
task_task = add.apply_async((a, a2), ignore_results=True)

# delay is an alternative to apply_async
task_2 = add.delay(a, a2)

# .get() for receiving the result
task_result = task.get()

# Same for task_2 with delay
task_2_result = task_2.get()

return HttpResponse(task_result)

So your options are: apply_async, delay and to previously use .s() or the Signature class itself to start a task. With .get() you will wait for the result and receive it until its received.

Workflow -> Canvas

Since you most of the time want to handle mutliple tasks we have to find a better example for that case:

# app_1/tasks.py

@shared_task()
def divide_by(number: int, n: int) -> int or None:
return number / n


# app_1/views.py

def calculate(request, a, a2, d):
"""Start the tasks with the given args"""

# We simply use a chain()
# A chain first calls task1 -> task2 and returns the final result
# It assignes the result (2 from add.s(4,2)) to the divide_by as
# argument for the parameter 'number'

chain_together = chain(
add.s(a, a2), divide_by(d)
)()

result = chain_together.get()

return HttpResponse(result)

I recommend to check out the docs for celery workflow BEFORE writing your tasks, because you have to look for some pitfalls:

  • Avoid launching synch subtasks. Dont use .get() or apply() inside a task to wait for a result. Thats only done inside views.py
  • Make sure to dont pass objects to a task which is a) expensive to serialize and b) can have already changed. Always query them based on ids or other relations to fetch them. Then work with the ORM task wise and try to avoid serializing them.
  • Use ignore_results if you do something thats not interactive, meaning that you dont programm an I/O process with a user waiting for a response/a parent task waiting for the child to finish
  • Set task limit. A task can theoretically live foreover. Make sure you‘ve read about soft_limit and task_limit.
  • Retry tasks where retries are a good opportunity, espacially with API requests -> Use backoff if thats good for your use case
  • Set rate limits for tasks f.ex. dont start the api call task more than 100 times per minute -> Avoids 429 responses
  • Find the correct settings for your operating server. Try to work with eventlet for network I/O tasks and with the default pool for computative tasks/cpu bound
  • Read the docs (https://docs.celeryq.dev/en/stable/userguide/tasks.html?highlight=best%20practice#tips-and-best-practices)

I hope that worked well for you. I will try to continuesly write about data science, celery, django and finance related topics.

My website/Programming background: https://palmy-investing.com/

--

--

Mika, from Palmy Investing

Independent Author. Web tech. topics, will cover more niche topics once fun/time is available.