Tutorial to run tasks with Celery and Flask.

Amandeep Singh
6 min readAug 19, 2023

--

What is Celery?

  • Celery is a robust distributed task queue system, widely utilized for managing and executing tasks in the background. It’s a versatile tool that can handle both straightforward asynchronous tasks and intricate multi-step workflows on a scheduled basis.

Before we move forward, lets learn about the important components of Celery.

To effectively run a Celery task, you require two components: the broker URL and the backend URL.

The broker URL serves as a vital communication link between your application and Celery workers. Acting as a queue, it stores tasks from your application for asynchronous processing. For instance, when your application encounters resource-intensive tasks such as sending emails or data processing, it enqueues these tasks in the broker. Celery workers then retrieve and execute these tasks, ensuring that your application remains responsive and efficient.

Meanwhile, the backend URL plays the role of a repository for completed task results. After a Celery worker processes a task, its output is stored in the backend. This capability allows your application to subsequently retrieve task results, ensuring a seamless user experience. In this tutorial, we will use Redis but there are different options for both the broker and backend.

Redis is often preferred as a Celery broker and backend due to its fast in-memory storage, task queuing(when implemented as queue), and reliable result storage capabilities.

While Redis offers simplicity and efficiency, other options like RabbitMQ, Amazon SQS, or Apache Kafka also stand as viable choices based on factors such as scalability and integration with your environment. The chosen option significantly influences how tasks are managed and results are stored, providing flexibility to tailor Celery to your specific requirements.

A tutorial to run Celery with Flask:

Step 1: Installing Redis:

First we need to install Redis in our system. Use this link to do that.

Step 2: Adding all the dependencies to run a celery task

Make a new folder with any name of your choice. In this case we can name it CeleryTutorial.

Lets create a virtual environment and install the dependecies.

//Create the virtual environment
python3 -m venv venv

//Activate the virtual environment
source venv/bin/activate

//Install celery
pip install celery

//Install Flask
pip install Flask

Step 3: Configuring Celery in Flask.

Create a python file config.py and add the following code to it.

from celery import Celery, Task
from flask import Flask

def celery_init_app(app: Flask) -> Celery:
class FlaskTask(Task):
def __call__(self, *args: object, **kwargs: object) -> object:
with app.app_context():
return self.run(*args, **kwargs)

celery_app = Celery(app.name, task_cls=FlaskTask)
celery_app.config_from_object(app.config["CELERY"])
celery_app.set_default()
app.extensions["celery"] = celery_app
return celery_app

In our celery_init_app function, we handle the initialization and configuration of Celery within our Flask app. We define a custom FlaskTask class that extends Celery's Task. This class ensures that tasks run within our app's context, allowing them to access app resources. We then create a Celery instance named celery_app, specifying the app's name and our FlaskTask class for task execution. We load Celery's configuration from our app's CELERY settings and set celery_app as the default Celery instance. Storing celery_app in our app's extensions, we return the configured celery_app instance.

In the same file, add this code.

def create_app() -> Flask:
app = Flask(__name__)
app.config.from_mapping(
CELERY=dict(
broker_url="redis://localhost",
result_backend="redis://localhost",
task_ignore_result=True,
),
)
app.config.from_prefixed_env()
celery_init_app(app)
return app

In the create_app factory pattern function, we start by creating a Flask app instance. We configure initial settings, including Celery-related configurations like the broker URL and result backend using Redis. To allow further configuration flexibility, we load additional settings from prefixed environment variables. Next, we initialize Celery by calling our celery_init_app function, passing the Flask app instance. Finally, we return the fully configured Flask app.

Step 4: Defining the tasks.

When using the Flask application factory pattern with Celery, the @celery_app.task decorator isn’t ideal as it lacks access to the celery_app object and can tie tasks to specific instances. Instead, use Celery’s @shared_task decorator. This decorator creates tasks that adapt to the “current app” context, similar to Flask’s app context. By setting celery_app.set_default(), tasks utilize the appropriate app context, enhancing compatibility with the factory pattern and improving testing and configuration flexibility.

Create a file tasks.py

from config import create_app #-Line 1
from celery import shared_task
from time import sleep

flask_app = create_app() #-Line 2
celery_app = flask_app.extensions["celery"] #-Line 3

@shared_task(ignore_result=False) #-Line 4
def long_running_task(iterations) -> int:#-Line 5
result = 0
for i in range(iterations):
result += i
sleep(2)
return result #-Line 6

Lets break down this code:

  1. We import create_app function from config module which has Flask configurations from config module.(Line 1)
  2. On Line 2, we create a flask app instance using the create_app() function and store it in flask_app object.
  3. Similarly, we create celery_app instance on line 3 by using the flask_app object.
  4. On line 4, we use @shared_task decorator to define the task and also set ignore_result=False as we need to save this result in redis.
  5. On Line 5, we define a long running task, which will run n times based on the user input and will sleep for 2 seconds while doing it.
  6. This task will return a result based on the input.(on Line 6)

Step 5: Calling the tasks using Flask

Now we will make a new file app.py which will have routes in flask.

from tasks import flask_app, long_running_task #-Line 1
from celery.result import AsyncResult#-Line 2
from flask import request,jsonify

@flask_app.post("/trigger_task")
def start_task() -> dict[str, object]:
iterations = request.args.get('iterations')
print(iterations)
result = long_running_task.delay(int(iterations))#-Line 3
return {"result_id": result.id}

@flask_app.get("/get_result")
def task_result() -> dict[str, object]:
result_id = request.args.get('result_id')
result = AsyncResult(result_id)#-Line 4
if result.ready():#-Line 5
# Task has completed
if result.successful():#-Line 6

return {
"ready": result.ready(),
"successful": result.successful(),
"value": result.result,#-Line 7
}
else:
# Task completed with an error
return jsonify({'status': 'ERROR', 'error_message': str(result.result)})
else:
# Task is still pending
return jsonify({'status': 'Running'})

if __name__ == "__main__":
flask_app.run(debug=True)
  1. Line 1 imports the flask_app object and long_running_task function from the tasks module. The flask_app object, created in the tasks module, holds the Flask application's configuration, allowing us to define routes in the app.
  2. Line 2 imports the AsyncResult class from the celery.result module. The AsyncResult class is used to retrieve the result of a Celery task by its ID.
  3. The line 3 line initiates the execution of the long_running_task Celery task asynchronously. The .delay() method schedules the task for execution and returns an instance of AsyncResult which represents the state and result of the task. The number of iterations is passed as an argument to the task.
  4. Here(Line 4), an instance of AsyncResult is created using the result_id obtained from the query parameters. This allows you to monitor the status and retrieve the result of a specific task using its ID.
  5. Line 5 checks if the task represented by the result instance is ready to be collected. The ready() method returns True if the task execution is complete and the result can be retrieved.
  6. Line 6 checks if the task completed successfully. It returns True if the task executed without any exceptions or errors.
  7. Line 7 assigns the result of the task to the value key in the response dictionary using result.result.

Overall, this code demonstrates how to create routes in a Flask application to trigger and monitor long-running tasks using Celery. The long_running_task is triggered asynchronously, and its progress can be tracked and its result retrieved using the AsyncResult class. The responses provide information about the status of the task execution and its result.

Step 7: Lets see how to run this.

  1. First we will start our redis server. Use the command based on your operating system.
redis-server

On MacOS, redis-server command is used to start the server.

2. Once you start the redis server, lets start the celery worker.

celery -A tasks worker --loglevel INFO

Use tasks as the worker name as it stores the tasks that we need to run and also has the celery configuration.

3. Start your flask application.

Once the flask application is running, we can check the API’s using Postman or any other tool of your choice.

Happy coding!

Lets connect on LinkedIn: aman-tech

If you enjoyed this tutorial, please click the 👏 button and share to help others find it! Feel free to leave a comment below.

--

--