Building Robust RabbitMQ Consumers With Python and Kombu: Part 1

Matt Pease
Python Pandemonium
Published in
6 min readOct 16, 2016

This article is going to cover building a RabbitMQ consumer in Python and Kombu that is capable of ensuring that the connection it establishes to the RabbitMQ server is still good and deal with it if it is not. This involves looking at the functionaliy Kombu provides around ensuring connections but also Kombu’s implementation of the heartbeat aspects of the AMQP specification.

In my previous article I covered the basics of RabbitMQ and I created a basic consumer which looked like this:

from kombu import Connection, Exchange, Queue, Consumerrabbit_url = “amqp://localhost:5672/”conn = Connection(rabbit_url)exchange = Exchange(“example-exchange”, type=”direct”)queue = Queue(name=”example-queue”, exchange=exchange, routing_key=”BOB”)def process_message(body, message):
print(“The body is {}”.format(body))
message.ack()
consumer = Consumer(conn, queues=queue, callbacks=[process_message], accept=[“text/plain”])
consumer.consume()
conn.drain_events(timeout=2)

After the line consumer.consume() we have a consumer that is ready to go, but it is the conn.drain_events(timeout=2) that actually triggers the consuming.

At the moment, the consumer attempts to retrieve one message and then stops. So in order to continually keep consuming messages we will need to add a in loop, such as a while loop:

while True:
conn.drain_events(timeout=2)

We have a choice when calling drain_events, if we do not specify the timeout argument like so:

conn.drain_events()

Then it will wait indefinitely for a message to appear on the queue before returning. Whereas, if we specify the timeout argument, say timeout=2, then drain_events will wait for 2 seconds for a message to appear in the queue and if no message appears in that time a socket.timeout error is raised.

I believe calling drain_events with a timeout is better, as it gives you the opportunity to check whether the connection is good and do something about it if it is not before you attempt to consume again. Without the timeout argument, where it indefinitely waits for a message, if the network fails for whatever reason then the consumer may not be able to recover, or not even be aware that the connection is now bad.

So given we are specifying a timeout, and a socket.timeout error is raised if the queue is empty, we then need to handle that error as it is a legitimate error and not one that should stop us from trying to consume again. To do that we add a try-except block:

import socketwhile True:
try:
conn.drain_events(timeout=2)
except socket.timeout:
pass # This will do for now

Now we have a consumer that will on every iteration of the while loop call drain_events and handle the socket.timeout error when it occurs. However, if the connection itself is compromised due to network issues (Don’t rely on the network) then this code will fail: currently this consumer creates the connection and from that point onwards assumes the connection is good!

Let’s assume at some point the connection will die. How are we going to recover from that? Well if the connection has died then the next time that conn.drain_events(timeout=2) is called it will raise an error and not a socket.timeout error. This means it will drop out of our while loop. So we know that if we have broken from the while loop, and if it was a connection error that was raised, then we should try and revive the connection to the rabbit server. Firstly lets move our while loop to a function like so:

def consume():
while True:
try:
conn.drain_events(timeout=2)
except socket.timeout:
pass # This will do for now

Now I am going to make the assumption that if consume() was called then it was done so after a connection error was raised (this will not be true the first time it is called, but it will work anyway) and so we need to revive the connection before we enter the while loop.

Let’s look at how to revive a connection. We are basically going to create a copy of the original connection and use that instead. The first two functions that will help us here are the clone() and ensure_connection() which the kombu Connection class supplies. The clone() function creates a copy of the connection with the same connection settings and the ensure_connection() makes sure the connection is good or errors. Let’s create a new function called establish_connection:

def establish_connection():
revived_connection = conn.clone()
revived_connection.ensure_connection(max_retries=3)
channel = revived_connection.channel()

We have now created a new connection, based on the old one, called revived_connection, and then we have created a channel using it. Currently, the consumer we have created is still using the old connection, to change that we call the revive() and consume() functions on the consumer itself like so:

def establish_connection():
revived_connection = conn.clone()
revived_connection.ensure_connection(max_retries=3)
channel = revived_connection.channel()
consumer.revive(channel)
consumer.consume()
print("connection revived!")
return revived_connection

So now establish_connection creates a new connection and channel and then shifts the consumer across to use them. Adding this to the consume function:

def consume():
new_conn = establish_connection()
while True:
try:
new_conn.drain_events(timeout=2)
except socket.timeout:
pass # This will do for now

Now we need to call the consume function, and I am going to creating a run function to do that:

def run():
while True:
try:
consume()
except conn.connection_errors:
pass

Note that the consume() function is called within a try-except block that ignores connection errors. The kombu Connection class has a function that returns a tuple of connection errors, and it is these errors that we can ignore because the consume() function establishes a new connection when it is called.

We can then run our new consumer by calling the run function:

run()

This increases the robustness of the consumer but we can do better! Currently, it can take quite some time to detect that a connection is dead and to enable speedy detection there is the concept of heartbeats in AMQP. This is a quote from the RabbitMQ documentation:

Network can fail in many ways, sometimes pretty subtle (e.g. high ratio packet loss). Disrupted TCP connections take a moderately long time (about 11 minutes with default configuration on Linux, for example) to be detected by the operating system. AMQP 0–9–1 offers a heartbeat feature to ensure that the application layer promptly finds out about disrupted connections (and also completely unresponsive peers). Heartbeats also defend against certain network equipment which may terminate “idle” TCP connections.

In this case a heartbeat is a message from application (our consumer) to the server (rabbit server) and vice versa to signal that the connection is still good. In order to implement this we have to signify that we want to use heartbeats when the connection is setup:

conn = Connection(rabbit_url, heartbeat=10)

By specifying heartbeat=10 we are saying to the server that we want to send heartbeats in 10 second intervals. In doing this we now need to call the function heartbeat_check at least twice as often on the connection, say every 5 seconds or less. It is worth noting that successful actions against the rabbit server act as heartbeats too, so we only need call the heartbeat when no action is happening. A good place to do this is on a socket.timeout error as this is when no message was available to consume. So our consume() function becomes:

def consume():
new_conn = establish_connection()
while True:
try:
new_conn.drain_events(timeout=2)
except socket.timeout:
new_conn.heartbeat_check()

The heartbest_check function sends heartbeats to the rabbit server. If the rabbit server does not receive timely heartbeats it will close the connection, and the next time new_conn.drain_events(timeout=2) is called it will raise a socket closed error, which will break from the while loop. This returns to the run function which will ignore the socket closed error and call consume again (which will establish a new connection).

Now we have a robust consumer that can quickly identify and recover from connection errors!

The complete code looks like this:

from kombu import Connection, Exchange, Queue, Consumer
import socket
rabbit_url = "amqp://localhost:5672/"conn = Connection(rabbit_url, heartbeat=10)exchange = Exchange("example-exchange", type="direct")queue = Queue(name="example-queue", exchange=exchange, routing_key="BOB")def process_message(body, message):
print("The body is {}".format(body))
message.ack()
consumer = Consumer(conn, queues=queue, callbacks=[process_message], accept=["text/plain"])
consumer.consume()
def establish_connection():
revived_connection = conn.clone()
revived_connection.ensure_connection(max_retries=3)
channel = revived_connection.channel()
consumer.revive(channel)
consumer.consume()
return revived_connection
def consume():
new_conn = establish_connection()
while True:
try:
new_conn.drain_events(timeout=2)
except socket.timeout:
new_conn.heartbeat_check()
def run():
while True:
try:
consume()
except conn.connection_errors:
print("connection revived")
run()

In the next article I will cover how you can do the same thing but using the ConsumerMixin class that kombu supplies instead.

--

--

Matt Pease
Python Pandemonium

Software engineer and architect. Interested in Python, designing software solutions and Lindy Hop.