Async Architecture with FastAPI, Celery, and RabbitMQ
Async IO is suitable when we want to perform small BackgroundTasks or to perform tasks using concurrency and parallelism. However, when it comes to performing heavy background computations or complex tasks, we would ideally not like to run them as a part of the same process. As such, to perform these complex tasks as a separate process, we need specialized tools such as Celery.
Celery is a distributed task queue that simplifies the management of task distribution and processing. Task queues are used as a mechanism to distribute work across threads or machines. A task queue’s input is a unit of work called a task. Dedicated worker processes constantly monitor task queues for new work to perform.
As a task-queueing system, Celery works well with long running processes or small repeatable tasks working in batches. The types of problems Celery handles are common asynchronous tasks.
Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task the client adds a message to the queue, and the broker then delivers that message to a worker. RabbitMQ and Redis are the brokers transports completely supported by Celery. In our example, we will use RabbitMQ as broker transport.
RabbitMQ is the most widely deployed open-source message broker. RabbitMQ is lightweight and easy to deploy on-premises and in the cloud. It supports multiple messaging protocols. RabbitMQ can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements. RabbitMQ is the default broker for Celery so it doesn’t require any additional dependencies or initial configuration, other than the URL location of the broker instance we want to use.
There are several tools available to monitor and inspect Celery clusters. Flower is a real-time web application monitoring and administration tool for Celery.
The following figure shows a simplified diagram of how various components interact. Here, we are using FastAPI as our Celery Client and RabbitMQ as a message broker and Result Backend.
- The client sends a request to our FastAPI application.
- FastAPI app sends the task message to the message broker.
- Celery workers consume the messages from the message broker. After the task finishes, it saves the result to the Result Backend and updates the task status.
- After sending the task to the message broker, the FastAPI app can also monitor the status of the task from the Result Backend.
- Flower can also monitor the celery app tasks by processing messages on the message broker.
In this tutorial, we will see how we can integrate Celery into FastAPI application to perform asynchronous tasks without blocking user’s requests. Although the example which we will use here is very trivial, it is just for pure demonstration purposes only.
Some of the use cases where the application of Celery with FastAPI can be useful:
- Sending out Emails as Background tasks in an app.
- Processing the uploaded images in the background.
- Offline training of ML models.
- Periodic tasks like report generation or web scrapping.
The following steps illustrate, how we can use Celery along with FastAPI to perform asynchronous tasks:
- Setup and Installation
- Set up Message Broker
- Add Celery
- Add Celery Tasks
- Add APIRouter
- Start the Application and Celery Worker Server
- Test the Application
- Monitor the Tasks
We require Python 3.9.2 with Pipenv and Git installed. Pipenv is a package and a virtual environment manager, which uses
PIP under the hood. It provides more advanced features like version locking and dependency isolation between projects.
1. Setup and Installation
Once the prerequisites are in place we can begin creating our application.
To begin with our application, create a folder called
fastapi-celery-rabbitmq-application in any directory on the disk for our project.
Navigate to the project folder.
Activate Virtual Environment
Once we are inside the project folder, execute the following commands to activate the VirtualEnv.
The virtual environment will now be activated, which will provide the required project isolation and version locking.
Next, we will install all the required dependencies using Pipenv present in requirements.txt.
After we execute the above commands, the required dependencies will be installed.
We can see now two files, which have been created inside our project folder, namely,
Pipfilecontains all the names of the dependencies we just installed.
Pipfile.lockis intended to specify, based on the dependencies present in
Pipfile, which specific version of those should be used, avoiding the risks of automatically upgrading dependencies that depend upon each other and breaking your project dependency tree.
Note: Here, we have installed all the dependencies with specific versions, which worked on my machine while writing this tutorial. If we don’t specify any version then the latest version of that dependency will be installed, which might not be compatible with other dependencies.
The next step is to set up a Message Broker.
2. Set up Message Broker
Celery requires a solution to send and receive messages; usually, this comes in the form of a separate service called a message broker. There are several choices available, RabbitMQ is one of them.
One of the easiest ways to run RabbitMQ on our local machine is by using Homebrew. Homebrew is a popular package manager for MacOS. RabbitMQ formula is available from Homebrew’s core tap (out of the box).
- To install the RabbitMQ server using Homebrew just execute the below command:
2. The server can then be started with rabbitmq-server in the foreground or with
brew services start rabbitmq to have it run under launched in the background.
3. Once we start the server then we can log in to the server with default credentials(
Username: guest, Password: guest) at
For other installation options for RabbitMQ, check the below link:
Documentation: Table of Contents
This page summarises the available RabbitMQ documentation for the latest patch release. See the Downloads and…
Our message broker is up and running. Let’s configure Celery now.
3. Add Celery
We have already installed all the dependencies required for Celery to work. Let’s now configure it. In the root directory of the project, let’s create a file named
- In the above code from
16we have created a factory function called
create_app(), which can be called any time, and returns a FastAPI app instance for us to use. In this function, we are calling a function called
create_celery()to create a Celery app instance. Also, we have integrated the universities routing module, which will be explained further in this tutorial as we go ahead.
- The code from line
24is required to run the FastAPI application using Uvicorn. FastAPI is the framework that we have used to build our API, and Uvicorn is the server that we will use to serve the requests.
Now start with the celery configuration by adding a file
- The above file contains all the configurations required by Celery to run. From
15we have defined the Broker URL and Result backend. Here, we are using RabbitMQ, which we installed in
step 2, as a broker as well as Result Backend.
23we have defined all the Queues, which we will use in our application along with the default
celeryQueue. Instead of manually configuring routing rules at each task level, here we will use
CELERY_TASK_ROUTESto configure the routing rules dynamically.
- We have added a helper method from
10fetch the routes per task at runtime.
39we defined a function to load the celery configuration during startup.
Celery App Instance
Let’s now add one more file
config/celery_utils.py that will contain the below code to create the celery app instance.
The above file defines a
create_celery factory function that configures and then returns a Celery app instance. It will read all the celery related configurations defined earlier. We also updated some of the default celery configurations. This function is called from
main.py while creating the FastAPI app instance.
line 22 to
32 we defined one more helper function
get_task_info to get the status and result of the asynchronous tasks submitted via Celery.
4. Add Celery Tasks
We have now all the Celery-related configurations in place. Let us now add some tasks which we want to execute asynchronously. We have to add a file
celery_tasks/tasks.py which will contain all the tasks which we want to execute asynchronously.
In the above file we have defined two tasks
get_all_universities_task is used to get the all the universities for the list of countries provided as input.
get_university_task is used to get the universities for a given country.
Here, we have used
shared_task to make our code reusable, which requires
current_app instance inside
create_celery method instead of creating a new Celery instance. The @shared_task decorator returns a proxy that always uses the task instance in the current_app. It lets us create tasks that can be used by any app(s). Since Celery has built-in support for retrying the failed tasks, we have enabled it for our tasks. With retry option, we can now let the exception bubble up and let the decorator handle it.
5. Add APIRouter
Now, let’s add some entry points to test our application. We will start with a simple Sync entrypoint. This entry point will call some external APIs and return the results to the client. Here, we are using the same free API which I have used in one of my earlier tutorial. It’s listed in Apipheny. We will use the Universities List API to get a list of universities for a specified country. For this tutorial, we are fetching the list of universities provided as input to the API. We will not be validating the response, as we are just simulating the scenario where in our application we are performing some long running tasks to complete the request like we need to call multiple APIs, do certain processing on top of that and then return the results.
Let’s add a dedicated file for handling just universities requests as a module at
We wanted to have the path operations related to universities separated from the rest of the code, to keep the code organized. But it’s still part of the same FastAPI application. We can create the path operations for any module using
APIRouter. We have already included this router in our application in
step 3 . Here we have added 4 different endpoints to test our application.
- get_universites : From
22we have defined a simple API which takes a list of countries as an input, and then it calls some external API to get the result. The result of each country is combined and then returned to the caller. This API is added just to check the API integration. It has nothing to do with Celery.
- get_universities_async: This API is used to demonstrate how we can use Celery to perform the long running tasks asynchronously. When we call this API, it submits a task message to the broker and returns the taskID for the same. Consumers then can use the other API to check the status of the submitted task or to get the task result. As we can see here, we just need to call the
get_all_universities_taskmethod defined earlier in
step 4. Celery task is called just by adding the
.apply_async()after the function name. This will tell Celery to add new task to the queue. It internally submits the task to
universitiesQueue. There will be a worker process which will listen to this Queue . The worker process will pickup the message, process the request and save the results in Result Backend.
- get_universities_parallel: This API is used to demonstrate how we can use Celery to split a large task into smaller subtasks and execute in parallel. When we call this API, we create a one task for each country provided as input. Then we group these tasks using Celery Group. Celery group is lazy so we must call it to take action and evaluate the group. Then we execute the group and wait for it to complete. Once all the tasks are completed then the group will be completed and we will get the response. After that we collect the response of the group and return it to the caller. This is a synchronous flow which uses Celery for parallel execution of subtasks.
- get_task_status: This API is used to get the status and result of the asynchronous tasks submitted using one of the API which we described earlier.
Let us have a look at the code which fetches the universities:
line 5 we are using the HTTP client named httpx to send an HTTP request. HTTPX is a fully-featured HTTP client for Python 3, which provides sync and async APIs, and support for both HTTP/1.1 and HTTP/2.
Here, we are using the sync API to get the universities data. In the above code in
line 6 we are calling the universities API to get the results. In
line 7 we convert the response to
JSON format. Then from
line 9 to
11 , we map the
JSON object to the Pydantic model
University. And finally, we return the dictionary, which contains the country name as key and the list of universities as value.
Now our code is complete to run the FastAPI with Celery.
6. Start the Application and Celery Worker Server
We have already added the code in
step 3 required to start the application.
FastAPI is the framework that we have used to build our API, and Uvicorn is the server that we will use to serve the requests. We have already installed Uvicorn. That will be our server. We can start the application by executing the below command:
Once the application starts successfully, we can navigate to
http://localhost:9000/docs. The system will bring up a page that looks something like this. We can see the endpoint in the Swagger-UI which was defined earlier.
Let’s now start the Celery worker server. We need to open a new terminal and go the project root directory. Then we need to install all the dependencies using the below command:
Pipenv install -r requirements.txt
Once all the dependencies are installed then we can execute the below command:
celery -A main.celery worker --loglevel=info -Q universities,university
The Celery worker will listen to the custom Queue :
universities, university used in our application.
Everything is up and running now! Let’s test our application.
7. Test the Application
Let us test our universities endpoint with some sample countries to validate the external API integration. Here, we are getting all the universities for “Turkey” and “India”. We are not validating the response or data.
As we can see above, we got the universities list for the requested countries. Here, we are using the Universities List API to get a list of universities for the specified country.
Once we have tested that the external API interaction is working fine, let’s test the asynchronous task execution using Celery. Here, we will try to get the list of universities for the same set of countries provided as input.
Once we submit the request, we just get the task_id as an acknowledgment response. The request is getting processed asynchronously using Celery as a separate process. We can check the status of the task using the other API. With this architecture, we can handle a large number of requests. We can Enqueue the user requests and process them as per resource availability for our application.
Let’s check the status of the asynchronous task submitted above. We can use the below API with task_id as input.
Let’s now test the scenario where we want to split the user request into multiple subtasks, process them in parallel using Celery and then return the results.
This has the same request and response as the first API which we tested earlier. However, this will execute faster because we are getting the universities of each country in parallel using Celery. Here, we got the response in 0.58 sec as compared to 1.67 sec as per the first API.
8. Monitor the tasks
Till now we can see that using Celery we are submitting some tasks to be executed in the background. It would be good if we can see the progress of these using GUI. The easiest way to monitor the Celery app is via Flower. Using it, we can easily check the progress of all the tasks. We already have all the dependencies installed. We just need to start
flower. To run the
flower server on our local machine execute the below command from the project root directory:
celery -A main.celery flower --port=5555
We can now see our flower monitoring tool running on our local server at
In the Dashboard, we can see the summary of all the tasks executed via Celery.
Once the tasks are submitted, we can also see the progress of each task by clicking on the
tasks tab on the Flower navigation bar.
For example, we can see the state of each task as it goes from In Progress to Success or Failed. We can also see when the task started, its runtime and its response, and so on.
In this tutorial, we saw how we can integrate Celery with FastAPI application. Using Celery along with FastAPI application will not only improve the overall performance and efficiency but will help in better handling of background jobs for long-running tasks. We also saw if we want to monitor the tasks submitted via Celery then how we can do the same using Flower.
If you would like to refer to the full code, do check:
GitHub - sumanentc/fastapi-celery-rabbitmq-application: Sample FastAPI Application to demonstrate…
Sample application utilizing FastAPI, Celery with RabbitMQ for task queue. RabbitMQ is also used as Celery backend and…
References & Useful Readings
Celery - Distributed Task Queue - Celery 5.2.6 documentation
This document describes the current stable version of Celery (5.2). For development docs, go here. Celery is a simple…
Groups, Chords, Chains and Callbacks - Celery 2.6.0rc4 documentation
New in version 2.0. The type is used to wrap the arguments and execution options for a single task invocation: For…
Flower - Celery monitoring tool - Flower 1.0.1 documentation
Flower is a web based tool for monitoring and administrating Celery clusters
How to use the @shared_task decorator for class based tasks
Quoting Ask from celery-users thread where he explained difference between @task a @shared_task. Here is link to the…
Documentation: Table of Contents
This page summarises the available RabbitMQ documentation for the latest patch release. See the Downloads and…