Talking to RabbitMQ with Python and Kombu

Matt Pease
Python Pandemonium
Published in
6 min readOct 14, 2016

This article is the first in a series delving into RabbitMQ and how to interact with it using Python and Kombu. I will cover the basics about how to connect to RabbitMQ and how to build basic Consumers and Producers.

What is RabbitMQ?

RabbitMQ is an open source messaging software that allows you to read messages from and write messages to queues using the Advance Message Queuing Protocol (AMQP).

At a basic level an application sends a message to a RabbitMQ server and the server routes that message to a queue. Then another application listening to that queue receives that message and does whatever it needs to with it.

An application that sends messages is called a producer, and an application reading messages is called a consumer.

Within a RabbitMQ server it is the exchanges that do the routing of the messages. So a producer will tell the server which exchange it wants to use, and the exchange figures out which queue to put the message on (depicted below).

An exchange can have access to many queues. So how does an exchange know which queue(s) to route the message to? Well, there are three types of exchange (direct, fanout and topic) and each type routes messages in a different way. I will use a direct exchange in this article and explain the others later in the series. With a direct exchange, when a queue is declared (created) it is bound to the exchange with a routing key. A routing key is a string such as “blue”, “bob” or “whateveryouwant”. When a producer sends a message it sends a routing key with it. The exchange takes the routing key of the message and matches it the routing keys of the queues bound to it. If any routing keys match, it adds the messages to that queue. If it matches multiple queue’s routing keys, then the messages gets added to them all.

In the example depicted below there are three queues bound to the exchange: A, B and C. Queue A and B are bound to the exchnage with routing key BOB and queue C is bound with routing key BLUE. The producer sends a message with a routing key of BOB and the exchange only puts the message on queues A and B because queue C’s routing key does not match.

Now, as mentioned above, consumers read the messages from queues and in order to do this a consumer must specify with queue it is listening to, or declare a new queue and then listen to that queue. Note that consumers can listen to more than one queue at the same time.

So using the example above we can specify a consumer for each queue like so:

Consumer D and Consumer E would receive a message to process. The final thing to say about consumers, for the moment, is that they have to acknowledge that they have processed the message, otherwise RabbitMQ does not know that it can then remove the message.

Now On To The Code

Let’s build a producer using Python 3 and Kombu. Kombu is a python messaging library that uses the AMQP protocol.

To install Kombu you can use pip:

pip install kombu

First off we need to establish a connection to a RabbitMQ server. To do that Kombu uses a Connection class which you can import straight from the kombu package:

from kombu import Connection

We can then instantiate that class and pass it the url of the RabbitMQ server:

rabbit_url = "amqp://localhost:5672/"conn = Connection(rabbit_url)

Using that connection we need to create a channel. It is the channel that is used when sending messages:

channel = conn.channel()

We then need to specify the exchange we want to use. Kombu has the Exchange class for this. We will create a direct exchange called “example-exchange”:

from kombu import Connection, Exchangeexchange = Exchange("example-exchange", type="direct")

We then need to create a producer. Kombu has the Producer class for this. We need to pass it the exchange and the channel we have created. It also needs the routing key that will be sent with each message:

from kombu import Connection, Exchange, Producerproducer = Producer(exchange=exchange, channel=channel, routing_key="BOB")

We are going to need to create a queue to send a message to. A producer does not need a queue in order to send a message but we will create one for this example. Kombu has the Queue class for this. We need to specify the name of the queue, the exchange and the routing key that the queue will be bound to the exchange with.

from kombu import Connection, Exchange, Producer, Queuequeue = Queue(name="example-queue", exchange=exchange, routing_key="BOB") 

We need to bind the queue to the exchange and then declare it (this actual creates the queue).

queue.maybe_bind(conn)
queue.declare()

Lastly, we need to actually send a message using the producer:

producer.publish("Hello there!")

And that is the producer done. The code all together is:

from kombu import Connection, Exchange, Producer, Queuerabbit_url = “amqp://localhost:5672/”conn = Connection(rabbit_url)channel = conn.channel()exchange = Exchange(“example-exchange”, type=”direct”)producer = Producer(exchange=exchange, channel=channel, routing_key=”BOB”)queue = Queue(name=”example-queue”, exchange=exchange, routing_key=”BOB”)
queue.maybe_bind(conn)
queue.declare()
producer.publish(“Hello there!”)

To check if the queue is created and it has one message on it you can use a tool that comes with RabbitMQ called rabbitmqctl and run the list_queues command:

rabbitmqctl list_queues

You should then see something like:

Listing queues …
example-queue 1

Okay, let’s build a consumer. The connection, exchange and queue is all the same as before except in this instance we don’t need to bind or declare the queue as that has already been done by the producer:

from kombu import Connection, Exchange, Queue, Consumerrabbit_url = “amqp://localhost:5672/”conn = Connection(rabbit_url)exchange = Exchange(“example-exchange”, type=”direct”)queue = Queue(name=”example-queue”, exchange=exchange, routing_key=”BOB”)

Consumer when they receive a message call a function to process the message. This is called a callback. We need to write this callback function and it needs two parameters: body and message. Initially it will just print out the body and acknowledge the message has been processed:

def process_message(body, message):
print(“The body is {}”.format(body))
message.ack()

Now we need to create the consumer. It requires the connection, the queue it is going to listen to, the callback function and what type of message to accept (such as plain text or json). The Consumer class is a context manager and therefore can be called using the with keyword:

with Consumer(conn, queues=queue, callbacks=[process_message], accept=[“text/plain”]):

Finally we need to tell the connection to initiate all the consumers. This is down with the drain_events function.

conn.drain_events(timeout=2)

The drain_events function will wait for 2 seconds for the consumers to consume a message and error with a timeout error if the queue is empty. If the timeout argument is not specified it will wait until a message is there to consume. I’ll cover this more in the next article.

That is the consumer done. The code altogether looks like:

from kombu import Connection, Exchange, Queue, Consumerrabbit_url = “amqp://localhost:5672/”conn = Connection(rabbit_url)exchange = Exchange(“example-exchange”, type=”direct”)queue = Queue(name=”example-queue”, exchange=exchange, routing_key=”BOB”)def process_message(body, message):
print(“The body is {}”.format(body))
message.ack()
with Consumer(conn, queues=queue, callbacks=[process_message], accept=["text/plain"]):
conn.drain_events(timeout=2)

When this code is run it will output something like:

The body is Hello there!

You can then use rabbitmqctl to check that the queue is now empty.

The code for both the basic producer and consumer can be found here.

In the next article I explore how to build robust consumers.

--

--

Matt Pease
Python Pandemonium

Software engineer and architect. Interested in Python, designing software solutions and Lindy Hop.