Asynchronous Processing in Celery.

Lewis Kabui
The Andela Way
Published in
6 min readJul 19, 2018
Photo by NeONBRAND on Unsplash

What is Celery (Take 1)?

The Celery website describes it as a “simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system”.

I know. This description is a mouthful. Especially so for novice users. To make things easier, we are going to define Celery afresh to match the context in this article.

What is Celery (Take 2)?

Celery is a nifty python package with features that enable us to implement;

  • time consuming operations in an asynchronous manner.
  • periodic operations in a cron-esque manner.

Celery has many more bells and whistles that are outside the scope of this article. For now, we will limit our focus to the features we listed above.

For the curious amongst us, here is a link to everything Celery.

Asynchronous Processing

You have probably come across a piece of functionality that required more than a few seconds to execute to completion. Web users today will barely tolerate operations that last more than three seconds on any website.

Asynchronous processing helps us reduce the user wait time and improve responsiveness.

The high level concept of asynchronous processing involves two steps;

  1. isolate the time consuming lines of code and place them in separate block for example a function
  2. ensure that the isolated code from (1) above is executed separately from the main process.

To drive the point home, let’s see how an app would execute a time consuming task synchronously and then see how we can use Celery for asynchronous processing.

Synchronous Processing

As you might have guessed, synchronous operations are all executed within the main process. Every line of code, regardless of the duration it takes to execute, will be processed in a sequential manner from top to bottom.

Let’s make this clearer with an example.

Create a directory called simple-celery in your projects folder. This is where all our code will reside.

mkdir simple-celery

Navigate into the simple-celery folder.

cd simple-celery

Create a file called main.py inside simple-celery and include the following lines.

# main.pyimport timeprint("Let's begin!")# time.sleep simulates a time consuming section of code that takes
# 20 seconds to complete.
time.sleep(20)print("... and that's the end of our really short app.")

Run this code with the following command.

python main.py

Unsurprisingly, this takes twenty seconds to execute to completion.

The flow of execution in synchronous processing is from top to bottom. The importand print statements take a negligible amount of time to execute.

time.sleep(20) is the time consuming operation in our app. We need to encapsulate that line of code into a function which we can then call asynchronously.

Installing Celery

Celery is an open source python package. It’s not part of the python standard library. As such, we need to install it; but before we do that, we need to make sure that we have a python environment management mechanism in place.

This link walks through the virtualenv environment management tool. virtualenv is sufficient (and the most beginner friendly) for our article, so we’ll use it.

If you decide to use other environment management tools, for example pipenv and virtualenvwrapper, you will have to adjust some of the commands we use in this article accordingly.

With pip installed, open your terminal and install virtualenv.

pip install virtualenv

Then create and activate the virtual environment.

# create the virtual environment
virtualenv simple-env
# activate the virtual environment
source simple-env/bin/activate

When your virtual environment is activated, you should see its name in brackets at the beginning of the prompt.

(simple-env)user@localhost$

We can now install Celery.

pip install celery

Message Brokers

Photo by Chad Kirchoff on Unsplash

Celery cannot work in isolation and requires the use of a message broker service. It uses this to send and receive messages.

A message can be any kind of information; in Celery’s case, a message is a task. Messages are managed by the message broker service via queues.

The message broker queue will store a message until it is consumed by one of Celery’s worker processes. Once consumed, the message is removed from the queue.

RabbitMQ is the message broker we will use for this article. Download, installation and start instructions for various platforms can be found here.

Run the following command to verify that the RabbitMQ server is up and running.

rabbitmq-server

The Asynchronous Celery Solution

With everything setup, we will create an alternate version of the synchronous app. Here, time.sleep(20) will be executed asynchronously.

Below is our improved asynchronous app in a file named task.py.

# task.pyimport timefrom celery import Celeryapp = Celery("task", broker="pyamqp://guest@localhost//")# extract the time consuming portion of code and place it into
# a separate block
@app.task
def sleep_asynchronously():
"""
This function simulates a task that takes 20 seconds to
execute to completion.
"""
time.sleep(20)

print("Let's begin!")
# this is how the sleep_asynchronously() method is invoked to
# execute asynchronously
sleep_asynchronously.delay()print("... and that's the end of our really short app.")

We start by creating a Celery application called app. During instantiation, we pass on the “task” argument; this is the name of the module i.e. task.py. The “broker” key word argument allows us to specify that we are connecting to RabbitMQ as the guest user.

time.sleep(20) is placed inside a function named sleep_asynchronously. This function is annotated with @app.task. This annotation enables us to run a function asynchronously using the delay method.

The Test Run

To run the asynchronous app, type the following command inside the simple-celery folder and hit enter;

celery -A task worker --loglevel=info

The -A option allows us to specify the module we want to run. We provide it with the value task.

The worker argument allows us to run workers. Workers are the processes that consume messages/tasks from the message broker queues.

The --loglevel option allows us to specify the kind of output we want to see on the console screen. We chose to go with info. Alternative values include DEBUG, WARNING, CRITICAL and ERROR . These can be used to influence console output to the desired verbosity.

From the resulting console output, we can see that the two print statements executed almost immediately we hit the enter key. The key takeaway here is that there was no significant delay between the two print statements.

Let's begin!
... and that's the end of our really short app.

In addition to the lines above, we can see a lot of Celery related output on the console. Of concern to us are the following lines;

[2018-05-14 18:59:39,808: INFO/MainProcess] Received task: task.sleep_asynchronously[82ec5b8f-d383-4795-8e48-8496413398f3]

This is the point at which the sleep_asynchronously function was invoked.

Twenty seconds later, we can see the following;

[2018-05-14 19:00:01,072: INFO/ForkPoolWorker-4] Task task.sleep_asynchronously[82ec5b8f-d383-4795-8e48-8496413398f3] succeeded in 20.002166117s: None

Celery is kind enough to let us know when an asynchronous task has completed execution.

From the output above, you may also have noticed that the sleep_asynchronously function was executed by ForkPoolWorker-4. The numerical suffix on your terminal may be different.

A Celery ForkPoolWorker is a worker process that executes independently off the main process. It runs sleep_asynchronously (does the heavy lifting) so that the main process will not be blocked by the time consuming time.sleep(20) statement.

These workers consume the messages from the message broker queues. Once the messages are consumed, they are taken off the queue.

Conclusion

That does it for the asynchronous implementation of operations using Celery.

We have covered how a simple app would look like with a naive synchronous implementation. We also saw how we could improve the same app with an asynchronous implementation using Celery.

Asynchronous implementations are used in real life to improve responsiveness when performing processor intensive tasks such as generating PDF reports from a large data set.

Another ideal use case for asynchronous operations is to execute external API calls. Whenever we rely on external APIs for example to send emails, we are no longer in control of how long it’s going to take for an operation to execute to completion. In some cases, the operation may never execute to completion e.g. if the API servers are unreachable/down. Asynchronous implementation of external APIs will abstract your users from long delays/errors that may happen.

In the external API scenario above, Celery comes with the added advantage where messages are not removed from the queue until they are consumed successfully (this is a RabbitMQ feature). This means that the Celery worker processes will keep attempting to consume the message until success is achieved when the external API is back online.

All the code on this text is available on GitHub.

The next part of this series will cover periodic tasks using Celery.

Stay tuned!

Do you need to hire top developers? Talk to Andela to help you scale.

--

--