Django : Handling Periodic Tasks With Celery
What is Celery Beat? It combines Celery, a well-known task delegation tool, with a nifty scheduler called Beat. It can help you manage even the most tedious of tasks.
As we know the work of software developers is filled with generating periodic reports, handling vast imports or exports, backups, frequent API requests, or simply sending batches of emails. Wouldn’t it be a developer’s paradise to have all these tasks automated and perfectly scheduled? It would — luckily, with the help of Celery Beat.
Getting Started
To get Celery on wheels, you’ll need a broker to send and receive messages. My choice is Redis 4.0.10, which also is deployed in a container. An alternative for Redis is RabbitMQ — you can read more about it in the official Celery documentation.
Redis has a reputation of being far easier to install and administrate, what makes my choice simple. With Redis installed you just call ‘redis-server’ to run it. That’s it. Here’s a quick test:
$ redis-cli ping
PONG
In the next step, you need to ensure that either your virtual environment or container are equipped with packages: celery==4.20 and redis==2.10.6. PIP is handy to get them in place. At the later stage, you’ll also use benefits of django_celery_beat==1.1.1.
How to use Celery Beat?
This is my project structure:
proj
/manage.py
/proj/__init__.py
/apps.py
/celery.py
/tasks.py
/settings.py
/my_app/__init__.py
/apps.py
/tasks.py
Remember to have all apps.py files setup. Additionally, each app __init__.py should point to default config:
default_app_config = 'my_app.apps.MyAppConfig'
An exception is __init__.py in your project folder, which stores Celery configuration:
from .celery import app as celery_app
__all__ = ['celery_app',]
At this point in the configuration, the next step is to move on to celery.py:
import os
from celery import Celery
# Set default Django settings os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
app = Celery('proj')
# Celery will apply all configuration keys with defined namespace app.config_from_object('django.conf:settings', namespace='CELERY')
# Load tasks from all registered apps
app.autodiscover_tasks()
The other way of configuration, besides using ‘config_from_object’, is assigning config values directly within the app — check out the documentation for further details.
With basics taken care of, let’s move on to settings.py and set up the Celery Beat’s specifics. The most simple config will look like this:
CELERY_BROKER_URL = 'redis://localhost:6379'
# If time zones are active (USE_TZ = True) define your local CELERY_TIMEZONE = 'Asia/Kolkata'
app.conf.enable_utc = False # so celery doesn't take utc by default
# We're going to have our tasks rolling soon, so that will be handy CELERY_BEAT_SCHEDULE = {}
There is new lowercase settings standard introduced in the latest versions of Celery Beat. However, there’s no rush in updating it as this well-adapted format is still supported.
Scheduling tasks in Celery Beat
Well done so far — now it’s time to move to tasks.py in your app module.
from celery import task
from celery import shared_task
# We can have either registered task
@task(name='summary')
def send_import_summary():
# Magic happens here ...
# or
@shared_task
def send_notifiction():
print(‘Here I\’m’)
# Another trick
No matter if a task is registered or shared, Celery is able to discover it. If the tasks you write will serve in reusable apps that can’t depend on the project itself, then app instance can’t be imported directly. In this case, using @shared_task decorator is the right way to ensure you’ll have everything in place.
With tasks’ functions ready, let’s move back to settings.py to add a schedule:
from celery.schedules import crontab
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_TIMEZONE = 'Europe/Warsaw'
# Let's make things happen
CELERY_BEAT_SCHEDULE = {
'send-summary-every-hour': {
'task': 'summary',
# There are 4 ways we can handle time, read further
'schedule': 3600.0,
# If you're using any arguments
'args': (‘We don’t need any’,),
},
# Executes every Friday at 4pm
'send-notification-on-friday-afternoon': {
'task': 'my_app.tasks.send_notification',
'schedule': crontab(hour=16, day_of_week=5),
},
}
In the example above, you’ve used two methods of handling time:
- simple time representation in seconds to declare an interval between repetitions;
- crontab, which works as ‘run every’ and lets you specify an exact time, day of week/month, and month in the year.
Also well known Python timedelta method can be applied here to set ‘by the clock’ schedule. The other fancy option which Celery Beat brings up is task execution according to the sun activity:
from celery.schedules import solar
# [...]
'schedule': solar('sunset', -37.81753, 144.96715),
Time to run your first worker!
Settings are done and dusted. Let’s give them a try.
$ celery -A proj beat -l INFO # For deeper logs use DEBUG
Beat can be embedded in regular Celery worker as well as with -B parameter. However, it’s not recommended for production use:
$ celery -A proj worker -B -l INFO
The default scheduler stores data in a local celerybeat-schedule file and bases on hardcoded settings. But what if you would also like to let app admins or even users create or simply adjusts schedules?
Here comes the django_celery_beat
Celery allows using custom scheduler classes. Why wouldn’t we keep it in a database and display conveniently in admin interface? Because that’s precisely what this extension does without any hassle for both you and other users.
At this moment you have django_celery_beat==1.1.1 installed, so let’s add it to INSTALLED_APPS in settings.py.
INSTALLED_APPS = (
...,
'django_celery_beat',
)
Django Celery Beat uses own model to store all schedule related data, so let it build a new table in your database by applying migrations:
$ python manage.py migrate
The last step is to inform your worker to read from custom scheduler: django_celery_beat.schedulers:DatabaseScheduler. To do so, you’ll need to rerun it:
$ celery -A proj beat -l INFO --scheduler django_celery_beat.schedulers:DatabaseScheduler
...
LocalTime -> 2018-07-13 09:29:25
Configuration ->
. broker -> redis://redis:6379//
. loader -> celery.loaders.app.AppLoader
. scheduler -> django_celery_beat.schedulers.DatabaseScheduler
. logfile -> [stderr]@%DEBUG
. maxinterval -> 5.00 seconds (5s)
[2018-07-13 09:29:25,212: INFO/MainProcess] beat: Starting...
[2018-07-13 09:29:25,212: INFO/MainProcess] Writing entries...
Voila! Let’s have a look at the admin panel:
All settings which were previously rooted in the code are now available at hand, thanks to what you can quickly click through all the ‘boring stuff’ or outsource it to someone else. Simple as that.
Let’s add a new periodic task:
Here you go!
[2018-07-13 10:25:03,120: DEBUG/MainProcess] DatabaseScheduler: Fetching database schedule
[2018-07-13 10:25:03,231: WARNING/ForkPoolWorker-1] Here I'm!
[2018-07-13 10:25:03,232: INFO/ForkPoolWorker-1] Task my_app.tasks.send_notification succeeded in 0.0005994100356474519s: None
<ModelEntry: Random my_app.tasks.send_notification(*[], **{}) <freq: 30.00 seconds>>
Although you may have an impression that configuration takes a lot of efforts, it’s just important not to miss any of initial setup steps to ensure that your Celery is sound and discovers all tasks. That’ll certainly save you from further debugging!
And one more tip: if you work with a database, don’t pass Django model objects to Celery tasks. Instead of that pass its primary key to get an object in its latest state straight from the database. That will help avoid situations where the object was changed and then overwritten by next task execution.