Building Robust RabbitMQ Consumers With Python and Kombu: Part 2

Matt Pease
Python Pandemonium
Published in
3 min readOct 21, 2016

In my previous article I delved into how to build robust RabbitMQ consumers from scratch using Kombu. In this article I am going to show an alternative way to do the same using the ConsumerMixin class that Kombu provides.

By a robust consumer I mean a consumer that quickly recovers when the connection to RabbitMQ is disrupted or dies completely, and this includes utilising the heartbeat features of the AMQP protocol. Out of the box the ConsumerMixin class does exactly that and more.

Let’s start straight away by importing the class:

from kombu.mixins import ConsumerMixin

We need to create a class that inherits from ConsumerMixin, let’s call it Worker:

class Worker(ConsumerMixin):

The ConsumerMixin class expects the descendant class to have an attribute of connection, which is an instance of the kombu Connection class. Let’s pass it into the contructor along with a list of the queues we want it to consume from. Each queue is an instance of the kombu Queue class:

class Worker(ConsumerMixin):
def __init__(self, connection, queues):
self.connection = connection
self.queues = queues

We need to specify the function that will get called when a Consumer receives a message. I’m going to call this function on_message, and it needs a parameter of body and message:

class Worker(ConsumerMixin):
def __init__(self, connection, queues):
self.connection = connection
self.queues = queues
def on_message(self, body, message):
print('Got message: {0}'.format(body))
message.ack()

For this example I will just print out the message and call message.ack() which lets the RabbitMQ server know we have dealt with the message.

Another responsibility of the descendant class is to implement the get_consumers function, which returns a list of the Consumers the worker will use. Each consumer is an instance of the kombu Consumer class:

class Worker(ConsumerMixin):
def __init__(self, connection, queues):
self.connection = connection
self.queues = queues
def get_consumers(self, Consumer, channel):
return [Consumer(queues=self.queues,
callbacks=[self.on_message])]
def on_message(self, body, message):
print('Got message: {0}'.format(body))
message.ack()

This class is now complete and all we need do now is use it. Firstly lets import the extra classes we are going to need to create a connection, exchange and queue:

from kombu import Connection, Exchange, Queue

Next let’s create an instance of the kombu Exchange and Queue class:

exchange = Exchange("example-exchange", type="direct")
queues = [Queue("example-queue", exchange, routing_key="BOB")]

Now let’s establish a connection to RabbitMQ using the Connection as a context manager:

with Connection(rabbit_url, heartbeat=4) as conn:

Notice that we are saying that want to use AMQP heartbeats here (see part 1 for more details). Next we create an instance of the Worker class and pass in the Queue list and connection we have created:

with Connection(rabbit_url, heartbeat=4) as conn:
worker = Worker(conn, queues)

Lastly, we need to call the run function on the worker. The ConsumerMixin supplies run, and it will begin the consuming process:

with Connection(rabbit_url, heartbeat=4) as conn:
worker = Worker(conn, queues)
worker.run()

With that we have a robust consumer! The code looks like this:

from kombu import Connection, Exchange, Queue
from kombu.mixins import ConsumerMixin
rabbit_url = "amqp://localhost:5672/"
class Worker(ConsumerMixin):
def __init__(self, connection, queues):
self.connection = connection
self.queues = queues
def get_consumers(self, Consumer, channel):
return [Consumer(queues=self.queues,
callbacks=[self.on_message])]
def on_message(self, body, message):
print('Got message: {0}'.format(body))
message.ack()
exchange = Exchange("example-exchange", type="direct")
queues = [Queue("example-queue", exchange, routing_key="BOB")]
with Connection(rabbit_url, heartbeat=4) as conn:
worker = Worker(conn, queues)
worker.run()

The run function loops continuously until an error occurs or the process is terminated. During it’s looping it consumes messages from the queues supplied and calls the on_message function for each message that appears on those queues. It also repeatedly calls the heartbeat_check function to ensure the connection to RabbitMQ is good, and if it is not good it recovers the connection and continues consuming.

This code and other kombu examples can be found here.

As you can see, with using the ConsumerMixin class it is easy to create a robust consumer.

--

--

Matt Pease
Python Pandemonium

Software engineer and architect. Interested in Python, designing software solutions and Lindy Hop.