Async Architecture with FastAPI, Celery, and RabbitMQ

Suman Das
Crux Intelligence
Published in
14 min readMay 10, 2022
FastAPI With RabbitMQ and Celery

In one of my earlier tutorials, we have seen how we can optimize the performance of a FastAPI application using Async IO. To know more you can refer to Concurrency with FastAPI.

Async IO is suitable when we want to perform small BackgroundTasks or to perform tasks using concurrency and parallelism. However, when it comes to performing heavy background computations or complex tasks, we would ideally not like to run them as a part of the same process. As such, to perform these complex tasks as a separate process, we need specialized tools such as Celery.

Celery is a distributed task queue that simplifies the management of task distribution and processing. Task queues are used as a mechanism to distribute work across threads or machines. A task queue’s input is a unit of work called a task. Dedicated worker processes constantly monitor task queues for new work to perform.

As a task-queueing system, Celery works well with long running processes or small repeatable tasks working in batches. The types of problems Celery handles are common asynchronous tasks.

Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task the client adds a message to the queue, and the broker then delivers that message to a worker. RabbitMQ and Redis are the brokers transports completely supported by Celery. In our example, we will use RabbitMQ as broker transport.

RabbitMQ is the most widely deployed open-source message broker. RabbitMQ is lightweight and easy to deploy on-premises and in the cloud. It supports multiple messaging protocols. RabbitMQ can be deployed in distributed and federated configurations to meet high-scale, high-availability requirements. RabbitMQ is the default broker for Celery so it doesn’t require any additional dependencies or initial configuration, other than the URL location of the broker instance we want to use.

There are several tools available to monitor and inspect Celery clusters. Flower is a real-time web application monitoring and administration tool for Celery.

The following figure shows a simplified diagram of how various components interact. Here, we are using FastAPI as our Celery Client and RabbitMQ as a message broker and Result Backend.

FastAPI with Celery Flow
  1. The client sends a request to our FastAPI application.
  2. FastAPI app sends the task message to the message broker.
  3. Celery workers consume the messages from the message broker. After the task finishes, it saves the result to the Result Backend and updates the task status.
  4. After sending the task to the message broker, the FastAPI app can also monitor the status of the task from the Result Backend.
  5. Flower can also monitor the celery app tasks by processing messages on the message broker.

In this tutorial, we will see how we can integrate Celery into FastAPI application to perform asynchronous tasks without blocking user’s requests. Although the example which we will use here is very trivial, it is just for pure demonstration purposes only.

Some of the use cases where the application of Celery with FastAPI can be useful:

  1. Sending out Emails as Background tasks in an app.
  2. Processing the uploaded images in the background.
  3. Offline training of ML models.
  4. Periodic tasks like report generation or web scrapping.

The following steps illustrate, how we can use Celery along with FastAPI to perform asynchronous tasks:

  1. Setup and Installation
  2. Set up Message Broker
  3. Add Celery
  4. Add Celery Tasks
  5. Add APIRouter
  6. Start the Application and Celery Worker Server
  7. Test the Application
  8. Monitor the Tasks

Prerequisites

We require Python 3.9.2 with Pipenv and Git installed. Pipenv is a package and a virtual environment manager, which uses PIP under the hood. It provides more advanced features like version locking and dependency isolation between projects.

1. Setup and Installation

Once the prerequisites are in place we can begin creating our application.

To begin with our application, create a folder called fastapi-celery-rabbitmq-application in any directory on the disk for our project.

create project folder

Navigate to the project folder.

Activate Virtual Environment

Once we are inside the project folder, execute the following commands to activate the VirtualEnv.

Start pipenv shell

The virtual environment will now be activated, which will provide the required project isolation and version locking.

Install Dependencies

Next, we will install all the required dependencies using Pipenv present in requirements.txt.

all the dependencies used in our application
install all the dependencies

After we execute the above commands, the required dependencies will be installed.

We can see now two files, which have been created inside our project folder, namely, Pipfile and Pipfile.lock.

  • Pipfile contains all the names of the dependencies we just installed.
  • Pipfile.lock is intended to specify, based on the dependencies present in Pipfile, which specific version of those should be used, avoiding the risks of automatically upgrading dependencies that depend upon each other and breaking your project dependency tree.

Note: Here, we have installed all the dependencies with specific versions, which worked on my machine while writing this tutorial. If we don’t specify any version then the latest version of that dependency will be installed, which might not be compatible with other dependencies.

The next step is to set up a Message Broker.

2. Set up Message Broker

Celery requires a solution to send and receive messages; usually, this comes in the form of a separate service called a message broker. There are several choices available, RabbitMQ is one of them.

One of the easiest ways to run RabbitMQ on our local machine is by using Homebrew. Homebrew is a popular package manager for MacOS. RabbitMQ formula is available from Homebrew’s core tap (out of the box).

  1. To install the RabbitMQ server using Homebrew just execute the below command:
Brew install RabbitMQ

2. The server can then be started with rabbitmq-server in the foreground or with brew services start rabbitmq to have it run under launched in the background.

3. Once we start the server then we can log in to the server with default credentials(Username: guest, Password: guest) at http://localhost:15672.

RabbitMQ Server

For other installation options for RabbitMQ, check the below link:

Our message broker is up and running. Let’s configure Celery now.

3. Add Celery

We have already installed all the dependencies required for Celery to work. Let’s now configure it. In the root directory of the project, let’s create a file named main.py .

main.py
  • In the above code from line 8 to 16 we have created a factory function called create_app(), which can be called any time, and returns a FastAPI app instance for us to use. In this function, we are calling a function called create_celery()to create a Celery app instance. Also, we have integrated the universities routing module, which will be explained further in this tutorial as we go ahead.
  • The code from line 23 to 24 is required to run the FastAPI application using Uvicorn. FastAPI is the framework that we have used to build our API, and Uvicorn is the server that we will use to serve the requests.

Celery Configuration

Now start with the celery configuration by adding a file config/celery_config.py.

Celery Config
  • The above file contains all the configurations required by Celery to run. From line 14 to 15 we have defined the Broker URL and Result backend. Here, we are using RabbitMQ, which we installed in step 2, as a broker as well as Result Backend.
  • From line 17 to 23 we have defined all the Queues, which we will use in our application along with the default celery Queue. Instead of manually configuring routing rules at each task level, here we will use CELERY_TASK_ROUTES to configure the routing rules dynamically.
  • We have added a helper method from line 6 to 10 fetch the routes per task at runtime.
  • From line 32 to 39 we defined a function to load the celery configuration during startup.

Celery App Instance

Let’s now add one more file config/celery_utils.py that will contain the below code to create the celery app instance.

Celery Utils

The above file defines a create_celery factory function that configures and then returns a Celery app instance. It will read all the celery related configurations defined earlier. We also updated some of the default celery configurations. This function is called from main.py while creating the FastAPI app instance.

From line 22 to 32 we defined one more helper function get_task_info to get the status and result of the asynchronous tasks submitted via Celery.

4. Add Celery Tasks

We have now all the Celery-related configurations in place. Let us now add some tasks which we want to execute asynchronously. We have to add a file celery_tasks/tasks.py which will contain all the tasks which we want to execute asynchronously.

tasks.py

In the above file we have defined two tasks get_all_universities_task and get_university_task . get_all_universities_task is used to get the all the universities for the list of countries provided as input. get_university_task is used to get the universities for a given country.

Here, we have used shared_task to make our code reusable, which requires current_app instance inside create_celery method instead of creating a new Celery instance. The @shared_task decorator returns a proxy that always uses the task instance in the current_app. It lets us create tasks that can be used by any app(s). Since Celery has built-in support for retrying the failed tasks, we have enabled it for our tasks. With retry option, we can now let the exception bubble up and let the decorator handle it.

5. Add APIRouter

Now, let’s add some entry points to test our application. We will start with a simple Sync entrypoint. This entry point will call some external APIs and return the results to the client. Here, we are using the same free API which I have used in one of my earlier tutorial. It’s listed in Apipheny. We will use the Universities List API to get a list of universities for a specified country. For this tutorial, we are fetching the list of universities provided as input to the API. We will not be validating the response, as we are just simulating the scenario where in our application we are performing some long running tasks to complete the request like we need to call multiple APIs, do certain processing on top of that and then return the results.

Let’s add a dedicated file for handling just universities requests as a module at /routers/universities.py.

universities_router.py

We wanted to have the path operations related to universities separated from the rest of the code, to keep the code organized. But it’s still part of the same FastAPI application. We can create the path operations for any module using APIRouter. We have already included this router in our application in step 3 . Here we have added 4 different endpoints to test our application.

  • get_universites : From line 14 to 22 we have defined a simple API which takes a list of countries as an input, and then it calls some external API to get the result. The result of each country is combined and then returned to the caller. This API is added just to check the API integration. It has nothing to do with Celery.
  • get_universities_async: This API is used to demonstrate how we can use Celery to perform the long running tasks asynchronously. When we call this API, it submits a task message to the broker and returns the taskID for the same. Consumers then can use the other API to check the status of the submitted task or to get the task result. As we can see here, we just need to call the get_all_universities_task method defined earlier in step 4 . Celery task is called just by adding the .apply_async() after the function name. This will tell Celery to add new task to the queue. It internally submits the task to universities Queue. There will be a worker process which will listen to this Queue . The worker process will pickup the message, process the request and save the results in Result Backend.
  • get_universities_parallel: This API is used to demonstrate how we can use Celery to split a large task into smaller subtasks and execute in parallel. When we call this API, we create a one task for each country provided as input. Then we group these tasks using Celery Group. Celery group is lazy so we must call it to take action and evaluate the group. Then we execute the group and wait for it to complete. Once all the tasks are completed then the group will be completed and we will get the response. After that we collect the response of the group and return it to the caller. This is a synchronous flow which uses Celery for parallel execution of subtasks.
  • get_task_status: This API is used to get the status and result of the asynchronous tasks submitted using one of the API which we described earlier.

Let us have a look at the code which fetches the universities:

In line 5 we are using the HTTP client named httpx to send an HTTP request. HTTPX is a fully-featured HTTP client for Python 3, which provides sync and async APIs, and support for both HTTP/1.1 and HTTP/2.

Here, we are using the sync API to get the universities data. In the above code in line 6 we are calling the universities API to get the results. In line 7 we convert the response to JSON format. Then from line 9 to 11 , we map the JSON object to the Pydantic model University. And finally, we return the dictionary, which contains the country name as key and the list of universities as value.

Now our code is complete to run the FastAPI with Celery.

6. Start the Application and Celery Worker Server

We have already added the code in step 3 required to start the application.

FastAPI is the framework that we have used to build our API, and Uvicorn is the server that we will use to serve the requests. We have already installed Uvicorn. That will be our server. We can start the application by executing the below command:

python main.py

Once the application starts successfully, we can navigate to http://localhost:9000/docs. The system will bring up a page that looks something like this. We can see the endpoint in the Swagger-UI which was defined earlier.

Swagger-ui

Let’s now start the Celery worker server. We need to open a new terminal and go the project root directory. Then we need to install all the dependencies using the below command:

Pipenv install -r requirements.txt

Once all the dependencies are installed then we can execute the below command:

celery -A main.celery worker --loglevel=info -Q universities,university

The Celery worker will listen to the custom Queue : universities, university used in our application.

Celery-worker

Everything is up and running now! Let’s test our application.

7. Test the Application

Let us test our universities endpoint with some sample countries to validate the external API integration. Here, we are getting all the universities for “Turkey” and “India”. We are not validating the response or data.

get-universities-request
get-universities-response

As we can see above, we got the universities list for the requested countries. Here, we are using the Universities List API to get a list of universities for the specified country.

Once we have tested that the external API interaction is working fine, let’s test the asynchronous task execution using Celery. Here, we will try to get the list of universities for the same set of countries provided as input.

async-request
async-response

Once we submit the request, we just get the task_id as an acknowledgment response. The request is getting processed asynchronously using Celery as a separate process. We can check the status of the task using the other API. With this architecture, we can handle a large number of requests. We can Enqueue the user requests and process them as per resource availability for our application.

Let’s check the status of the asynchronous task submitted above. We can use the below API with task_id as input.

get task status request
get task status response

Let’s now test the scenario where we want to split the user request into multiple subtasks, process them in parallel using Celery and then return the results.

parallel-request
parallel-response

This has the same request and response as the first API which we tested earlier. However, this will execute faster because we are getting the universities of each country in parallel using Celery. Here, we got the response in 0.58 sec as compared to 1.67 sec as per the first API.

8. Monitor the tasks

Till now we can see that using Celery we are submitting some tasks to be executed in the background. It would be good if we can see the progress of these using GUI. The easiest way to monitor the Celery app is via Flower. Using it, we can easily check the progress of all the tasks. We already have all the dependencies installed. We just need to start flower. To run the flower server on our local machine execute the below command from the project root directory:

celery -A main.celery flower --port=5555
flower

We can now see our flower monitoring tool running on our local server at http://localhost:5555/ .

Flower-dashboard

In the Dashboard, we can see the summary of all the tasks executed via Celery.

Once the tasks are submitted, we can also see the progress of each task by clicking on the tasks tab on the Flower navigation bar.

For example, we can see the state of each task as it goes from In Progress to Success or Failed. We can also see when the task started, its runtime and its response, and so on.

Celery-tasks

Conclusion

In this tutorial, we saw how we can integrate Celery with FastAPI application. Using Celery along with FastAPI application will not only improve the overall performance and efficiency but will help in better handling of background jobs for long-running tasks. We also saw if we want to monitor the tasks submitted via Celery then how we can do the same using Flower.

If you would like to refer to the full code, do check:

References & Useful Readings

--

--