Celery Asynchronous Task Queues with Flower & FastAPI

Azzan Amin
TheLorry Data, Tech & Product
9 min readJan 2, 2021

Let’s implement the amazing Celery Distributed Task Queue with FastAPI and monitor the background tasks (workers) using Flower.

Photo by Arisa Chattasa on Unsplash

Most modern data science and enterprise grade applications comprise of computationally expensive tasks that may take some-time to complete. The user experience of the application will be affected by such workloads as they spend time waiting till the task gets completed, unable to execute any other process or interact with the system.

Examples of such long running tasks include sending emails, huge math calculations, image processing, batch geocoding or calculating the best possible delivery routes for 1000 different bulky parcels at TheLorry.

The concept of asynchronous processing or running tasks in background includes placing of time-consuming processes in a separate block which is usually executed separately from the main process. The tool for controlling asynchronous tasks we will be using is this article is Celery. Celery requires a message broker which is used for sending and receiving messages. A message is any kind of information. In case of Celery a message is a background task that we want to run. A message broker will store our task until it is used by one of the Celery workers.

This is where the implementation of Task Queue becomes utmost important.

So, What is Task Queue?

Task queues let applications perform work, called tasks, asynchronously outside of a user request. If an app needs to execute work in the background, it adds tasks to task queues. The tasks are executed later, by worker services.

The Task Queue service is designed for asynchronous work.

Google Cloud Docs

In short, Task Queue can perform the long-running jobs in the background asynchronously. Thus, users can have a fast response experience from the system without being affected by the workload. It will make the work becomes more efficient and much more effective as the users can execute many requests at a time.

Wow! Task queue is indeed really awesome.

In this article, we will try to expose you to the amazing task queue build in Python called Celery. Then, we will try to bring you to a walkthrough on how to implement Celery with FastAPI and monitor the celery background tasks (workers) using Flower Monitoring Tool.

Let’s go!

Understanding the Celery Building Blocks

Celery is a distributed task queue that helps execute lots of processes/messages in the background asynchronously with real-time processing. The celery consists of three main components:

  1. Celery Client
  2. Message Broker
  3. Celery Worker

The diagram below shows the simplified diagram of how the components interact with each other. In our case, we will be using FastAPI as our Celery Client and RabbitMQ as the Message Broker.

Interaction of Celery Components

Here is the explanation:

  • The Celery client will run the FastAPI app and will issue messages/background jobs to RabbitMQ.
  • RabbitMQ will be the message broker that will mediate the messages between clients and workers.
  • After receiving the message from the client, RabbitMQ will initiate the client task by sending it to a celery worker.
  • A Celery worker is considered as background tasks that will achieve the asynchronicity in any web server requests.
  • There can be multiple workers to perform/complete many tasks at a time.
  • The celery will ensure that each worker only executes one task at a time and each task is assigned by only one worker.

Hopefully, this simple explanation will help you to understand about the gist of how the celery components work.

Without further ado, let’s implement celery in our FastAPI project.

Building the FastAPI with Celery

The sample project we created in this walkthrough tutorial is based on FastAPI. FastAPI is a modern, fast (high-performance), web framework for building APIs with Python 3.6+ based on standard Python type hints. If, you’re interested in learning more about this cool framework, you can read this article written by its awesome author, Sebastián Ramírez.

If you need to perform simple background tasks that are not compute intensive and can run using the same process, you might benefit from using a simple tool like FastAPI’s Background Tasks.

Take it from us, FastAPI 🤩 is the most efficient way of creating api’s in python.

Let’s get it rolling!

1. Requirements

To install the fastapi, use this command:

pip install fastapi

Then, install the celery package:

pip install celery

We will also need to install the ASGI server to run our FastAPI app. (Hpercorn is another alternative for this)

pip install uvicorn

2. Set Up the Message Broker

One of the easiest ways to run RabbitMQ on our local machine is by using Docker. But before that, make sure that we have Docker installed in our local machine. If you do not have it installed yet, please follow this link to get it installed: Get Docker

Docker is hotter than hot because it makes it possible to get far more apps running on the same old servers and it also makes it very easy to package and ship programs. Docker enables developers to easily pack, ship, and run any application as a lightweight, portable, self-sufficient container, which can run virtually anywhere.

Once done, just run the command below to spin up the RabbitMQ image through docker in the terminal/cmd.

docker run -d --name some-rabbit -p 4369:4369 -p 5671:5671 -p 5672:5672 -p 15672:15672 rabbitmq:3

Nice! Your RabbitMQ is now up and running.

3. Create the Celery Worker Task

Now the workers can be created once the message broker is running. It is because celery workers listen to the message broker in order to execute the queued tasks. In this part, we will create the task for the celery worker. From our root project, create a file celery_worker.py with:

from time import sleep
from celery import Celery
from celery.utils.log import get_task_logger
# Initialize celery
celery = Celery('tasks', broker='amqp://guest:guest@127.0.0.1:5672//')
# Create logger - enable to display messages on task logger
celery_log = get_task_logger(__name__)
# Create Order - Run Asynchronously with celery
# Example process of long running task
@celery.task
def create_order(name, quantity):

# 5 seconds per 1 order
complete_time_per_item = 5

# Keep increasing depending on item quantity being ordered
sleep(complete_time_per_item * quantity)
# Display log
celery_log.info(f"Order Complete!")
return {"message": f"Hi {name}, Your order has completed!",
"order_quantity": quantity}

The code above basically shows a task where it will create an item order based on the client’s requests. Each order will take 5 seconds and it will keep increasing depending on the order quantity. This purposely done to show an example of a simple long-running task. You can also customize the celery task based on your own preference to mimic a long running task.

4. Create the Model and App

Create BaseModel “Order” using pydantic inside a file called model.py.

from pydantic import BaseModel# Pydantic BaseModel
# Order class model for request body
class Order(BaseModel):
customer_name: str
order_quantity: int

Then, create a new file called main.py :

from fastapi import FastAPI
from celery_worker import create_order
from model import Order
# Create FastAPI app
app = FastAPI()
# Create order endpoint
@app.post('/order')
def add_order(order: Order):
# use delay() method to call the celery task
create_order.delay(order.customer_name, order.order_quantity)
return {"message": "Order Received! Thank you for your patience."}

From the code above, we can see that the celery task is being called by only adding the .delay() after the function name. This will tell the celery to add new task to the queue.

Awesome! Now our code is complete to run the FastAPI with celery.

5. Running FastAPI and Celery Worker server

To run the FastAPI app:

uvicorn main:app --reload
FastAPI Running

Visit http://localhost:8000/docs to see the running FastAPI in Swagger Docs.

We can start our worker server by using the following command:

celery -A celery_worker.celery worker --loglevel=info
Celery Worker Running

Great! Now everything is running. Let’s try some tests on the API and analyze the result.

6. Test and Analyze the Celery

Once we visit http://localhost:8000/docs , we can try our endpoint by inserting a request body input to test it. This is the example input for the request body:

Input for Request Body

Click on ‘Execute’ to get the response from the endpoint and we will see this result:

Response body

Yeayy! our order is received.

The API will send an immediate response (without needing to wait) to us as it works in an asynchronous manner with Celery.

We can also create a few requests at the same time and the API will enqueue them to the Celery Task Queue. This process will not interrupt the previous tasks as the requests are handled by the RabbitMQ as the broker that will ensure the communication works properly between the client and workers.

The interesting part of this is that our long-running tasks are being done in the background within a separate thread. This will offload the execution of the heavy tasks to the other parts of our application architecture instead of running them on our main thread. This will lead to a more efficient and faster completion of multiple long-running tasks in our FastAPI.

Absolutely Powerful!

Now, we can check our celery worker logs in our console.

This is the example result that we will get from the logs:

[2021–01–02 04:07:52,510: INFO/MainProcess] Received task: celery_worker.create_order[34baa50b-d38e-41a9–963d-36c5bc7e384f]
[2021–01–02 04:07:57,525: INFO/MainProcess] celery_worker.create_order[34baa50b-d38e-41a9–963d-36c5bc7e384f]: Order Complete!
[2021–01–02 04:07:57,527: INFO/MainProcess] Task celery_worker.create_order[34baa50b-d38e-41a9–963d-36c5bc7e384f] succeeded in 5.01600000000326s: {‘message’: ‘Hi Harry, Your order has completed!’, ‘order_quantity’: 1}

Our order is now completed! And yes, it is done in 5 seconds since we request only one order. This means that all our Celery components are working fine as they can deliver the response successfully. Cool!

But have you ever wondered:

  • Is there any efficient method to monitor the background tasks?

The answer is Yes! We can use the Flower Monitoring tool to monitor all our celery workers. Let’s see how can we set up our Flower.

7. Set Up and Test the Flower Monitoring Tool

To set up flower in our project, it is very easy. First, install the package:

pip install flower

Then, run the flower server on our local machine:

celery flower -A celery_worker.celery --broker:amqp://localhost//

And that’s it! We can now see our flower monitoring tool running on our local server. Visit at http://localhost:5555/ .

Flower Monitoring Tool

Okay, let’s try to add a few tasks to the Celery through our FastAPI.

Then, go back to the Flower server and click on the “Tasks” tab on the navigation bar to monitor our celery workers.

All right, here is the example output:

Celery Worker Tasks Details in Flower

Nice! We can now monitor the detailed logs of our workers in a more structured way and thanks to the Flower monitoring tool.

You can find the complete code of the project here.

Good luck, thank me later!

Summary

Celery + Flower + FastAPI + Docker is really powerful Combo. Isn’t it?!

We have just shown you how to implement Celery features to our own FastAPI project that will improve the overall performance, efficiency and allow better handling of background jobs for long-running tasks.

In this article, we discussed the implementation of the asynchronous task queue using Celery with FastAPI. Along the way, we also learned about the Celery components, spinning up RabbitMQ using Docker, building FastAPI with Celery and Monitoring Celery using Flower.

Flower Monitoring Tool is also very useful in monitoring the Celery workers in a structured representation that makes it easy for us to understand the celery worker processes and log details clearly.

We hope this article will help you understand the importance of background tasks, task queues, docker and learn how to implement celery with your own cool FastAPI Projects.

Peace! ✌️

--

--