Python + RabbitMQ

Alphonso Global
5 min readJul 22, 2019

--

Python

In one of our Python applications, there was a requirement for a pub-sub pattern. In which the publisher will publish message and there can be multiple subscribers, listening to different types of messages.

For this, we used RabbitMQ open source message broker. It is a message broker, which accepts and forwards messages. RabbitMQ supports multiple protocols for different types of applications. The one we chose for our project is AMQP, as it’s the core protocol supported by RabbitMQ, is very robust, and is recommended by the RabbitMQ team. RabbitMQ is lightweight and easy to deploy on premises and in the cloud.

We used Pika, which is a Python client recommended by the RabbitMQ team.

Install Pika –

pip install pika — upgrade

Terms used –

  • Producer — The program which sends message (publishes)
  • Consumer — The program which receives message (consumes)
  • Queue — Queue is a buffer, messages are stored inside queue. Queue lives inside RabbitMQ.
  • Exchange — In RabbitMQ, producer never sends message directly to queue, it always needs to go through an exchange. Exchange receives messages from producer on one side and then forwards them to single or multiple queues based on the type of exchange.
  • Bindings — Relationship between exchange and queue is called binding.

Exchange Types –

  • Direct — Sends a message to the queue whose binding key matches.
  • Topic — Sends the message to queues based on the pattern.
  • Headers — Routes messages to queues based on message header values instead of routing key.
  • Fanout — Broadcasts a message to all the queues.

In our application, we get data from different exchanges for different indexes. When new data is received we have to perform actions based on the type of exchange and type of index. So we have to subscribe based on exchange and index type. To implement that we used ‘Topic’ exchange.

Messages sent to a topic exchange can’t have an arbitrary routing_key — it must be a list of words, delimited by dots. E.g. in case of our application we used keys in the form of exchange.index i.e. ‘nse.nifty’, ‘nse.banknifty’ etc. There can be as many words in the routing key as you like, up to the limit of 255 bytes.

The binding key must also be in the same form. The logic behind the topic exchange is — a message sent with a particular routing key will be delivered to all the queues that are bound with a matching binding key. However there are two important special cases for binding keys:

* (star) can substitute for exactly one word.

# (hash) can substitute for zero or more words.

Following diagram explains the message flow in our application.

python

P — Publisher (sends message when data is available with particular routing key)

S — Subscribers (listening for particular message using queues bound with specific keys)

NSE Queue — It is interested in all the message with ‘nse.*’ key i.e. all the data from nse exchange

NYSE Queue — Interested in all the data from nyse exchange (‘nyse.*’ key)

That means message with routing key ‘nse.nifty’ or ‘nse.banknifty’ will be delivered to ‘NSE’ queue.

Following code gives example of the ‘Publisher’ program in our application.

Publisher –

#publisher.py
import pika
class Publisher:
def __init__(self, config):
self.config = config
def publish(self, routing_key, message):
connection = self.create_connection()
# Create a new channel with the next available channel number
or pass in a channel number to use channel =
connection.channel()
# Creates an exchange if it does not already exist, and if
the exchange exists,
# verifies that it is of the correct and expected class.
channel.exchange_declare(exchange=self.config[‘exchange’],
exchange_type=’topic’)
#Publishes message to the exchange with the given routing key
channel.basic_publish(exchange=self.config[‘exchange’],
routing_key=routing_key, body=message)
print(“ [x] Sent message %r for %r” % (message,routing_key))
# Create new connection
def create_connection(self):
param = pika.ConnectionParameters(host=self.config[‘host’],
port self.config[‘port’])
return pika.BlockingConnection(param)
config = { ‘host’: ‘localhost’,’port’: 5672, ‘exchange’ : ‘my_exchange’}
publisher = Publisher(config)
publisher.publish(‘nse.nifty’, ‘New Data’)

channel.exchange_declare method creates an exchange if it does not already exist, and if the exchange exists, verifies that it is of the correct and expected class.

channel.basic_publish method publishes the message for the given exchanges and routing_key.

python publisher.py

This will publish the message ‘New Data’ with routing key ‘nse.nifty’ to exchange.

Following code gives an example of ‘Subscriber’ in our application.

Subscriber –

#subscriber.py
import pika
import sys
class Subscriber:
def __init__(self, queueName, bindingKey, config):
self.queueName = queueName
self.bindingKey = bindingKey
self.config = config
self.connection = self._create_connection()
def __del__(self):
self.connection.close()
def _create_connection(self):
parameters=pika.ConnectionParameters(host=self.config['host'],
port = self.config['port'])
return pika.BlockingConnection(parameters)
def on_message_callback(self, channel, method, properties, body):
binding_key = method.routing_key
print(“received new message for -” + binding_key
def setup(self):
channel = self.connection.channel()
channel.exchange_declare(exchange=self.config['exchange'],
exchange_type='topic')
# This method creates or checks a queue
channel.queue_declare(queue=self.queueName)
# Binds the queue to the specified exchang
channel.queue_bind(queue=self.queueName,exchange=self.config
[‘exchange’],
routing_key=self.bindingKey)
channel.basic_consume(queue=self.queueName,
on_message_callback=self.on_message_callback, auto_ack=True)
print(‘ [*] Waiting for data for ‘ + self.queueName + ‘. To
exit press CTRL+C’)
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
config = { ‘host’: ‘localhost’, ‘port’: 5672, ‘exchange’ : ‘my_exchange’}
if len(sys.argv) < 2:
print(‘Usage: ‘ + __file__ + ‘ <QueueName> <BindingKey>’)
sys.exit()
else:
queueName = sys.argv[1]
#key in the form exchange.*
key = sys.argv[2]
subscriber = Subscriber(queueName, key, config)
subscriber.setup()

channel.queue_declare creates a queue on particular channel, if it does not already exist. Creating a queue using queue_declare is idempotent ‒ we can run the command as many times as we like, and only one will be created.

channel.queue_bind method binds the queue with queueName to a specific exchange using specified binding key.

channel.basic_consume method tells the broker to start a consumer process, which checks for messages on a specified queue, and then registers a callback function that should be executed in the Subscriber class when a message is available and has been delivered to the client.

To receive messages from ‘NSE’ queue (for all the keys of the format ‘nse.*’ ) run:

python subscriber.py nse_queue nse.*

To receive messages from ‘NYSE’ queue run:

python subscriber.py nyse_queuse nyse.*

For more such Articles Do visit https://alphonsoglobal.com/blogs/

--

--

Alphonso Global

Delivering the difference through world-class software solutions and products.