Using Celery and MQTT to launch long running tasks with feedback to the user
While doing work for VRM (Victron Remote Management) platform I worked with an interesting combination of tools to accomplish launching long running tasks with feedback to the user.
A few common ways to handle long(er) running tasks:
- Starting a long running task by launching it with an asynchronous request that returns when the task is complete.
- Option 1 but sending the request through a websocket and receiving status updates through that same websocket.
- Launching the task with a request that creates a job to be handled by a worker async which returns upon adding the job to the queue.
- Option 3 with status updates by saving/updating the status in a database and having the browser poll for the current status.
Some of these methods work fine for shorter tasks, tasks that are less sensitive to disruption, but none of the above work for longer tasks with status updates without polling in an effectively non-disruptable way.
A way to achieve the above using mostly the same tools as the options described above is as follows:
Celery is a task queue, basically celery allows you to execute tasks asynchronously (or synchronously). You can submit tasks, Celery will communicate to the connected Celery workers that a task is available and one of the workers takes the task out of the queue and starts executing the task.
Each task has an id and the result of the task gets placed by the worker in the backend in use by celery (redis in this case). The result can then be fetched from celery/redis if required.
Redis is a key value store, it is often used as cache backend because of high performance and seeing as this is already available on the server running the VRM backend it is an easy choice to go for Redis instead of RabbitMQ which is also commonly used with Celery. Celery uses redis to distribute the tasks to the workers and to return the result of a task.
MQTT is a lightweight messaging protocol in which you are able to subscribe and publish to paths. The exchange of messages is facilitated by a (message) broker. Publishers connect to the broker and send messages to paths, subscribers connect to the broker and subscribe to paths. The subscribers to a certain path will receive a message when a message is published to that path. Using MQTT in a browser is usually done using a websocket connection to a broker. The MQTT broker in use is Mosquitto exposed over a websocket using websockify, another popular broker for MQTT is HiveMQ.
Requirements and conclusion
The only requirement for the backend/API is that you can submit tasks to Celery with it. You can implement this yourself, but chances are that there is an existing library for the programming language in use. The VRM backend is PHP, so to submit tasks we use the celery-php library.
There is only one requirement for the web application backend, which is that it should be able to submit tasks to celery. With that covered an overview of what happens in the graph above:
- Backend creates a job for the task the users wants to run and returns a message to the browser it did so successfully.
- Celery worker executes said task and sends status updates out to a specific path over MQTT.
- Browser is connected to the MQTT broker and is subscribed to the path where status updates will be sent.
This way the task is launched with a short request, because it will return after launching the task instead of executing it. Status updates are send by a Celery worker that could be running on a different server, through the use of MQTT the browser will receive the status updates regardless. If the user reloads the page and a new status messages is received in the browser it could still show the current status of the task
What is described above is implemented for a system to update the firmware of devices from VRM, there is no control over the process after it is launched which is desirable in this case. You don’t want a user to be able to interrupt the firmware updating progress. A Celery worker is responsible for executing the firmware update which means that it is outside of the web requests/PHP part of the application.