Pierre B.
Pierre B.
Jan 21, 2018 · 2 min read

In this post, I will walk you through how we monitor our celery workers. We use Bucky to collect metrics and push them into Graphite which is then queried by Grafana.

Celery monitoring

Event collection

We collect metrics by simply subscribing to celery signals. We mostly use 4 signals:

  • prerun: before execution
  • postrun: after execution
  • success: on success
  • failure: on failure

Pretty straightforward.

import statsd
from celery.signals import (
import myappstatsd_conn = statsd.StatsClient(
def task_prerun_handler(task_id, task, *args, **kwargs):
def task_postrun_handler(task_id, task, *args, **kwargs):
def task_success_handler(sender, result, **kwargs):
def task_success_handler(sender, result, **kwargs):

You then just need a Bucky instance to push metrics to your Graphite database.

Displaying task metrics

Below is an example of a dashboard showing the metrics of a set of tasks.

Task monitoring

This dashboard carries three pieces of information:

  • is it working?
  • timeline of events
  • task success and failures volumes

The timeline of events is simply counts of signals on a graph panel and volumes are singlestat panels with counts of signals.

Is it working?

Using diffSeries , simply compute the difference between the number of tasks received (prerun signal) versus the number of tasks that succeeded (success signal). Map values to text i.e OK=0 and NOK<0, and map values to background colors i.e green=0 and red<0. See value to text mapping and coloring.

Broker monitoring

In addition to monitoring tasks, we also monitor workers using broker metrics.

Event collection

We have a homemade script that uses celery event receiver and Tornado IO loop. Below is a simplified version.

from celery import Celery
from tornado.ioloop import IOLoop
import myapp
io_loop = IOLoop.instance()
celery = Celery(broker=myapp.config['BROKER_URL'])
statsd_conn = statsd.StatsClient(
io_loop.start()while True:
with celery.connection() as connection:
recv = celery.events.Receiver(connection, handlers={
'*': event_dispatcher
recv.capture(limit=None, timeout=None, wakeup=True)
except (KeyboardInterrupt, SystemExit):
def event_dispatcher(event):
handlers = {
'task-succeeded': task_handler,
'task-sent': task_handler,
'task-received': task_handler,
'task-started': task_handler,
'task-failed': task_handler,
'task-rejected': task_handler,
'task-revoked': task_handler,
'task-retried': task_handler,
func = handlers.get(event['type'], self.trash)
io_loop.add_callback(partial(func, event))
def task_handler(event):
path = '{}.task.state.{}'.format(

Displaying worker metrics

Below is an example of a dashboard showing the metrics of a set of workers.

Broker monitoring

Our monitoring screens carry very valuable information, that is why we put tv screens on the walls to spread information across the office to technical and non-technical people.

Grafana is a beautiful tool and helped us tremendously monitoring asynchronous tasks, micro-services, databases and servers.

MeilleursAgents Engineering

MeilleursAgents Engineering Teams (Product, Web & Data Teams)

Thanks to Pierrick

Pierre B.

Written by

Pierre B.

Engineering manager @MeilleursAgents

MeilleursAgents Engineering

MeilleursAgents Engineering Teams (Product, Web & Data Teams)

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade