Crux Intelligence
Published in

Crux Intelligence

Async Architecture with FastAPI, Celery, and RabbitMQ

FastAPI With RabbitMQ and Celery
FastAPI with Celery Flow
  1. The client sends a request to our FastAPI application.
  2. FastAPI app sends the task message to the message broker.
  3. Celery workers consume the messages from the message broker. After the task finishes, it saves the result to the Result Backend and updates the task status.
  4. After sending the task to the message broker, the FastAPI app can also monitor the status of the task from the Result Backend.
  5. Flower can also monitor the celery app tasks by processing messages on the message broker.
  1. Sending out Emails as Background tasks in an app.
  2. Processing the uploaded images in the background.
  3. Offline training of ML models.
  4. Periodic tasks like report generation or web scrapping.
  1. Setup and Installation
  2. Set up Message Broker
  3. Add Celery
  4. Add Celery Tasks
  5. Add APIRouter
  6. Start the Application and Celery Worker Server
  7. Test the Application
  8. Monitor the Tasks

1. Setup and Installation

create project folder
Start pipenv shell
all the dependencies used in our application
install all the dependencies
  • Pipfile contains all the names of the dependencies we just installed.
  • Pipfile.lock is intended to specify, based on the dependencies present in Pipfile, which specific version of those should be used, avoiding the risks of automatically upgrading dependencies that depend upon each other and breaking your project dependency tree.

2. Set up Message Broker

  1. To install the RabbitMQ server using Homebrew just execute the below command:
Brew install RabbitMQ
RabbitMQ Server

3. Add Celery

main.py
  • In the above code from line 8 to 16 we have created a factory function called create_app(), which can be called any time, and returns a FastAPI app instance for us to use. In this function, we are calling a function called create_celery()to create a Celery app instance. Also, we have integrated the universities routing module, which will be explained further in this tutorial as we go ahead.
  • The code from line 23 to 24 is required to run the FastAPI application using Uvicorn. FastAPI is the framework that we have used to build our API, and Uvicorn is the server that we will use to serve the requests.
Celery Config
  • The above file contains all the configurations required by Celery to run. From line 14 to 15 we have defined the Broker URL and Result backend. Here, we are using RabbitMQ, which we installed in step 2, as a broker as well as Result Backend.
  • From line 17 to 23 we have defined all the Queues, which we will use in our application along with the default celery Queue. Instead of manually configuring routing rules at each task level, here we will use CELERY_TASK_ROUTES to configure the routing rules dynamically.
  • We have added a helper method from line 6 to 10 fetch the routes per task at runtime.
  • From line 32 to 39 we defined a function to load the celery configuration during startup.
Celery Utils

4. Add Celery Tasks

tasks.py

5. Add APIRouter

universities_router.py
  • get_universites : From line 14 to 22 we have defined a simple API which takes a list of countries as an input, and then it calls some external API to get the result. The result of each country is combined and then returned to the caller. This API is added just to check the API integration. It has nothing to do with Celery.
  • get_universities_async: This API is used to demonstrate how we can use Celery to perform the long running tasks asynchronously. When we call this API, it submits a task message to the broker and returns the taskID for the same. Consumers then can use the other API to check the status of the submitted task or to get the task result. As we can see here, we just need to call the get_all_universities_task method defined earlier in step 4 . Celery task is called just by adding the .apply_async() after the function name. This will tell Celery to add new task to the queue. It internally submits the task to universities Queue. There will be a worker process which will listen to this Queue . The worker process will pickup the message, process the request and save the results in Result Backend.
  • get_universities_parallel: This API is used to demonstrate how we can use Celery to split a large task into smaller subtasks and execute in parallel. When we call this API, we create a one task for each country provided as input. Then we group these tasks using Celery Group. Celery group is lazy so we must call it to take action and evaluate the group. Then we execute the group and wait for it to complete. Once all the tasks are completed then the group will be completed and we will get the response. After that we collect the response of the group and return it to the caller. This is a synchronous flow which uses Celery for parallel execution of subtasks.
  • get_task_status: This API is used to get the status and result of the asynchronous tasks submitted using one of the API which we described earlier.

6. Start the Application and Celery Worker Server

python main.py
Swagger-ui
Pipenv install -r requirements.txt
celery -A main.celery worker --loglevel=info -Q universities,university
Celery-worker

7. Test the Application

get-universities-request
get-universities-response
async-request
async-response
get task status request
get task status response
parallel-request
parallel-response

8. Monitor the tasks

celery -A main.celery flower --port=5555
flower
Flower-dashboard
Celery-tasks

Conclusion

References & Useful Readings

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store