Celery with Flask

Sadman Amin
4 min readAug 8, 2021

--

Often, our application runs several tasks at the same time that delay the natural flow of the system. A tedious task such as sending emails to a hundred or thousand users could take some time.

The improvement of globally available bandwidth and latency has led to users expecting faster page loads. Even in time-consuming activities like the one above, developers should ensure that users have a better user experience. How can this superpower be achieved? Celery saves the day!

What is Celery?

An open-source asynchronous task queue such as Celery allows work to be completed in the background independent of the typical request/response cycle. It will allow you to send emails to your users in the background while your application can process some other works.

It follows a Producer-Consumer model which is a classic example of a multi-process synchronization problem. In this model, a producer and a consumer (or worker in this scenario) shares a common storage or task queue. A producer grabs a task and adds it to a queue. A consumer finds the task from the queue and completes it.

Components of Celery

As mentioned earlier, Celery follows a model that contains a producer and consumer. Based on that, Celery has 4 components.

  1. Celery Client — It acts as the producer that adds task to the background task queue.
  2. Celery Worker — It is the consumer that fetches tasks from the queue and starts processing them. Celery supports more than one workers that can work simultaneously.
  3. Message Broker — Its used as the communicator between the client and worker through a message queue. The most commonly used brokers are RabbitMQ and Redis.
  4. Result Backend — Once each celery worker finish processing their background job, they use result backend to store status and results from tasks. If any error occurs during the processing, its also recorded in it. Redis, AMQP and SQLAlchemy are some of the popular tools used to store such results.

To see it together, the Celery client is the producer which adds a new task to the queue via the message broker. Celery workers then consume new tasks from the queue, again, via the message broker. Once processed, results are then stored in the result backend.

Getting started with Celery in Flask

Setting Up

The integration of Celery with Flask is so simple that no extension is required. Before starting to code in Flask, we need to set our Message Broker and Result Backend client. For this blog, we will use Redis. You can set up and run Redis directly from your operating system or from a Docker container. Open your terminal and run the following command:

$ sudo apt install docker.io
$ docker run -p 6379:6379 --name some-redis -d redis

This downloads the official Redis Docker image from Docker Hub and runs it on port 6379 in the background. Keep it running and move to a new terminal.

Create a new directory for this project named Celery-with-Flask.

$ mkdir celery-with-flask
$ cd celery-with-flask/
$ python -m venv celenv
$ source celenv/bin/activate

Now we need install the dependencies for this project and for that, we need a requirements.txt file. Create and edit file with the following dependencies

Flask
celery==4.4.7
redis==3.5.3

and install them

pip install -r requirements.txt

We are ready to cook now!

Code

Create a new file called app.py and initialize Flask and Celery clients as follows:

Now that our flask application is ready, we can add a function that will run on background. Any functions that you want to run as background tasks need to be decorated with the celery.task decorator. For example:

Then the Flask application can request the execution of this background task as follows:

task = my_background_task.delay(10, 20)

The delay() method is a shortcut to the more powerful apply_async() call. Here is the equivalent call using apply_async():

task = my_background_task.apply_async(args=[10, 20])

The delay or apply_asyncmethod is to send a new message to the message broker. The worker process picks up and executes the task from the queue.

Lets create a meaningful heavy function that we want to run on background. Finding the nth Fibonacci number is a well known algorithm in our computing world. But, it takes some noticeable amount of time for numbers larger than 30. Lets see whether we can find 35th Fibonacci number (which takes 5–6 sec to complete) simultaneously without blocking our process using the following find_factorial_async function.

In a new terminal window, navigate to your project directory, activate the virtual environment, and then run:

(celenv)$ celery worker -A app.celery --loglevel=info

Open another terminal to your project directory with venv activated and open the flask shell

(celenv)$ FLASK_APP=app.py flask shell

Let’s send some tasks to the Celery worker to find factorial

>>> from app import find_fibonacci_async
>>> task = find_fibonacci_async.delay(35)

After entering, the code finished executing instantly but worker process picked up the task from queue and started processing it in background.

Turn on the celery worker terminal and you will see the following log:

[2021-08-08 21:26:31,941: INFO/MainProcess] Received task: app.find_fibonacci_async[963ca2ec-87a1-4309-b6b3-9a3e83fcdae6]  
[2021-08-08 21:26:35,814: INFO/ForkPoolWorker-2] Task app.find_fibonacci_async[963ca2ec-87a1-4309-b6b3-9a3e83fcdae6] succeeded in 3.8694669000105932s: 5702887

task is an AsyncResult instance which can be used to check the task state along with the return value or exception details.

Add two new task then print task.state and task.result:

>>> task2 = find_fibonacci_async.delay(36)
>>> task3 = find_fibonacci_async.delay(37)
>>> print('Task2 -',task2.state,task2.result,'Task3',task3.state,task3.result)
Task2 - PENDING None Task3 PENDING None
>>> print('Task2 -',task2.state,task2.result,'Task3',task3.state,task3.result)
Task2 - SUCCESS 9227465 Task3 PENDING None
>>> print('Task2 -',task2.state,task2.result,'Task3',task3.state,task3.result)
Task2 - SUCCESS 9227465 Task3 PENDING None
>>> print('Task2 -',task2.state,task2.result,'Task3',task3.state,task3.result)
Task2 - SUCCESS 9227465 Task3 SUCCESS 14930352

If any error happens, it will be stored in result.

This blog was highly inspired from TestDriven.io and Miguel Ginbergs Blog.

--

--

Sadman Amin

Backend Developer with a deep interest in DevOps pursuing higher studies in Cybersecurity