Timed Periodic Tasks in Celery.
In the second part of this series we aim to cover implementation of periodic tasks using Celery.
Periodic tasks, in layman’s terms, can be explained as tasks whose execution will happen at pre-determined time intervals and with minimal human intervention.
Celery has a scheduler called beat. This scheduler will place messages in a message broker queue when the time is right. In the first part of this article, we mentioned that messages can be any kind of information and that messages are equivalent to tasks from the Celery perspective.
As beat places messages in message broker queues, they become ready for consumption by the next available Celery worker.
To create periodic tasks, we need to start by defining them via the beat_scheduler
setting. Celery beat checks the beat_scheduler
setting to discover the tasks that need to be executed periodically.
Let’s clarify this with a simple example.
See you in ten!
We will create a task that prints a statement to the console every ten seconds.
We will be using the
simple-celery
folder from Part 1 of this article.
Inside your simple-celery
folder, create a file called periodic.py
and include the following;
# periodic.pyfrom celery import Celeryapp = Celery('periodic', broker="pyamqp://guest@localhost//")@app.task
def see_you():
print("See you in ten seconds!")app.conf.beat_schedule = {
"see-you-in-ten-seconds-task": {
"task": "periodic.see_you",
"schedule": 10.0
}
}
Initial Run
Open your terminal and navigate to the simple-celery
directory. Once you get there, ensure that you have activated your virtual environment.
source simple-env/bin/activate
Run periodic.py
using the following command.
celery -A periodic beat --loglevel=info
The
beat
command allows us to start the Celery beat service.In the sections that follow, we shall refer to the command above as the “initial run”.
The console will produce multiple lines of output. Every ten seconds, we can see output that is similar to the sample below.
2018-05-24 22:00:28,364: INFO/MainProcess] Scheduler: Sending due task see-you-in-ten-seconds-task (periodic.see_you)
Did you notice the ten second timestamp difference for each successive line? From this output, we can tell that the periodic.see_you
task is being sent somewhere every ten seconds.
Where is it being sent?
Let’s leave the initial run to continue running on its own terminal window while we revisit our old friend; the message broker.
Message Brokers
Remember message brokers from the part 1 of this article?
To recap, we said that message brokers are used to manage messages via queues. Celery uses message brokers to send and receive messages.
We also said that messages can be any kind of information. From Celery’s perspective, messages are tasks.
The Celery beat service is sending the periodic.see_you
task to a queue in the message broker every ten seconds. And we can verify this!
rabbitmqctl — RabbitMQ’s command line utility
rabbitmqctl
is used to manage the RabbitMQ message broker via command line. This command should be available as long as RabbitMQ is installed.
rabbitmqctl
can be used to list the message queues on your computer.
Open a new terminal window/tab and run the following command.
rabbitmqctl list_queues
In your output, you should have a row that contains the queue name celery
and an integer value thereafter. Celery uses the celery
queue by default. The integer value represents the number of unprocessed messages/tasks in the celery
queue. We will refer to this as the message count.
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
celery 18
This output simply means that my computer has a message count of 18 i.e. I have 18 unprocessed tasks in the
celery
queue.
The message count may vary on your computer depending on how long the initial run has been executing. The longer it has been running, the more messages it has added to the celery
queue.
It is also likely that your message count may vary because your celery
queue contains messages from different celery apps. If you suspect that this is the case and you want to start afresh, you can discard all the messages from the celery
queue and reset the message count to zero.
# discard all the messages in the celery queue
rabbitmqctl purge_queue celery# verify the purge
rabbitmqctl list_queuesTimeout: 60.0 seconds ...
Listing queues for vhost / ...
celery 0
Our goal is to verify that the message count increases by one every time the initial run sends the periodic.see_you
task to the celery
queue.
So every time you see this kind of output on the initial run terminal …
2018-05-24 22:00:28,364: INFO/MainProcess] Scheduler: Sending due task see-you-in-ten-seconds-task (periodic.see_you)
… you can verify that a message has been added to the celery
queue.
rabbitmqctl list_queues
Timeout: 60.0 seconds ...
Listing queues for vhost / ...
celery 19
Now that we know where the task is being sent, we need to answer one more question.
Is the periodic.see_you
task being executed?
We expected to see this at least once.
See you in ten seconds!
Celery Workers
Workers, as the name suggests, do the important work of consuming messages from a queue. Once they consume a message successfully, that message is taken off the queue by the message broker.
You may have noticed that the celery beat service is running on the MainProcess
as it sends the periodic.see_you
task to the message broker.
2018-05-24 22:00:28,364: INFO/MainProcess] Scheduler: Sending due task see-you-in-ten-seconds-task (periodic.see_you)
The Celery beat service, via the MainProcess
, adds tasks to the celery
queue and trusts that worker processes will execute the tasks in the queue.
Since the initial run is running on the main process, we need to make sure that worker processes are in action so that the messages can be consumed. In order to do this, we need to run Celery workers on yet another terminal window/tab.
Open a new terminal, navigate to the simple-celery
folder and run the following commands.
# activate the virtual environment
source simple-env/bin/activate# run the celery worker
celery -A periodic worker --loglevel=info
There is a lot of output on the console when the workers run! The workers are consuming tasks off the celery
queue.
...
...
[2018-06-07 22:02:29,049: INFO/MainProcess] Received task: periodic.see_you[130bd707-4518-410a-bdd5-bf802064264c]
[2018-06-07 22:02:29,050: INFO/MainProcess] Received task: periodic.see_you[910a97d8-b603-41cc-9588-c5123b519ce4]
...
...
Later on, we can see when the tasks are being executed.
...
...
[2018-06-07 22:02:29,170: WARNING/ForkPoolWorker-3] See you in ten seconds!
[2018-06-07 22:02:29,170: WARNING/ForkPoolWorker-2] See you in ten seconds!
...
...
And finally, we can see the point at which the tasks have finished execution.
...
...
[2018-06-07 22:02:29,171: INFO/ForkPoolWorker-2] Task periodic.see_you[0671bee0-aa8c-4c57-95d0-acd5edc5e5a1] succeeded in 0.00250419200165s: None
[2018-06-07 22:02:29,171: INFO/ForkPoolWorker-3] Task periodic.see_you[130bd707-4518-410a-bdd5-bf802064264c] succeeded in 0.00253255899952s: None
...
...
We have one more sanity check!
Every time a worker consumes a message successfully, that message is taken off the queue by the message broker. Let’s take a look at the message count again.
rabbitmqctl list_queuesTimeout: 60.0 seconds ...
Listing queues for vhost / ...
celery 0
This means that we have zero unprocessed messages in the celery
queue.
Conclusion
We have seen how to implement one form of periodic tasks in Celery.
Timed periodic tasks can be used in use cases that require simple repetitive action e.g. performing system backups after every twelve hours.
However, some use cases are more complex. They demand more finesse. If they were to be implemented with the timed approach, the end result would be an ugly hack. This is where Celery’s crontab feature comes in and excels.
We will cover the crontab approach in part 3 of this article.
All the code on this text is available on GitHub.
Do you need to hire top developers? Talk to Andela to help you scale.
Are you looking to accelerate your career as a developer? Andela is currently hiring senior developers. Apply now.