How to use RabbitMQ with python?

Ankit Kumar Singh
Analytics Vidhya
Published in
4 min readOct 20, 2020

The purpose of this blog is to make a quick “hello world” like application by using RabbitMQ as a test case.
RabbitMQ is an Open Source, light weight, easy cloud deployable and highly scalable messaging broker. It used by tech companies like Reddit, Stack, Trivago etc. This Blog is the quick get to go Guide for installing and using rabbitMQ in your own python based projects.

Installation

Setting up rabbitmq-server

To access RabbitMQ in python or what we call it as “a pure-Python AMQP 0–9–1 client for rabbitMQ”, there is a package(library) called pika which can be installed using pip. But first we need to install “rabbitmq-server” which will run as a system program at backend.
* Install erlang
* Install rabbitmq-server
* Enable rabbitmq-server as system program
* Start rabbitmq-server at backend
* Enable rabbitmq management plugin

sudo apt-get update && sudo apt-get upgrade
sudo apt-get install erlang
sudo apt-get install rabbitmq-server
sudo systemctl enable rabbitmq-server
sudo systemctl start rabbitmq-server
sudo rabbitmq-plugins enable rabbitmq_management
  • create an username and password to login to rabbitmq management
##user is the username and password is the new password
sudo rabbitmqctl add_user user password
##giving that user adiministraitve rights
sudo rabbitmqctl set_user_tags user administrator
  • granting administrator permissions
sudo rabbitmqctl set_permissions -p / user "." "." "."

This will start the rabbitmq-server. And you are now all set for accessing it using an AMQP rabbitmq client called pika in python.

Installing Pika

pip install pika

you’re all set for accessing and managing rabbitmq from python

Message Producer and Consumer “Application”

Simple Pictorial representation of an example

As described in this diagram we have producer and consumer of messages. So there are two sides of This design, One is Producer and other is Consumer we will see both ends and write a python script for that.

Producer

* credentials- In this we will define the username and password which is known by the rabbitmq-server(refer installation segment above)
* host- by default we use ‘localhost’ or 0.0.0.0 as the listening server, but it can have any other IP addresses on cloud that has rabbitmq-server listening
* port- this is by default 5672, but it should point to the port where our server is listening
* exchange- this can be assumed as a bridge name which needed to be declared so that queues can be accessed
* routing_key- this is a binding key corresponding to that key, we can set it to be any name
* basic_publish- this is the method which we call to send the message to the corresponding queue

#producer
import pika
#declaring the credentials needed for connection like host, port, username, password, exchange etc
credentials = pika.PlainCredentials(‘tester’,’secretPass’)
connection= pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’, credentials= credentials))
channel= connection.channel()
channel.exchange_declare(‘test’, durable=True, exchange_type=’topic’)channel.queue_declare(queue= ‘A’)
channel.queue_bind(exchange=’test’, queue=’A’, routing_key=’A’)
channel.queue_declare(queue= ‘B’)
channel.queue_bind(exchange=’test’, queue=’B’, routing_key=’B’)
channel.queue_declare(queue= ‘C’)
channel.queue_bind(exchange=’test’, queue=’C’, routing_key=’C’)
#messaging to queue named C
message= ‘hello consumer!!!!!’
channel.basic_publish(exchange=’test’, routing_key=’C’, body= message)
channel.close()

We will use the same credentials that we created during the installation of rabbimq-server.

Exchange name: in my case dde
In dde Exchange: there are five queues declared

Consumer

#consumerimport pika
#declaring the credentials needed for connection like host, port, username, password, exchange etc
credentials = pika.PlainCredentials('tester','secretPass')
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost', port='5672', credentials= credentials))
channel = connection.channel()
channel.exchange_declare('test', durable=True, exchange_type='topic')
#defining callback functions responding to corresponding queue callbacks
def callbackFunctionForQueueA(ch,method,properties,body):
print('Got a message from Queue A: ', body)
def callbackFunctionForQueueB(ch,method,properties,body):
print('Got a message from Queue B: ', body)
def callbackFunctionForQueueC(ch,method,properties,body):
print('Got a message from Queue C: ', body)
#Attaching consumer callback functions to respective queues that we wrote above
channel.basic_consume(queue='A', on_message_callback=callbackFunctionForQueueA, auto_ack=True)
channel.basic_consume(queue='B', on_message_callback=callbackFunctionForQueueB, auto_ack=True)
channel.basic_consume(queue='C', on_message_callback=callbackFunctionForQueueC, auto_ack=True)
#this will be command for starting the consumer session
channel.start_consuming()

This often happens

Due to blocking connection if channel got connected for long time then server rejects the connection. To overcome this we can go through two approaches, One to change the connection type for long time, Other can be to make connection only when you want to send or receive something. Second one is better approach.

def startConsumer():
connection= pika.BlockingConnection(pika.ConnectionParameters(host=’localhost’))
#creating channels
channel= connection.channel()
#connecting queues to channels
channel.basic_consume(queue=’A’, on_message_callback=callbackFunctionForQueueA, auto_ack=True)
channel.basic_consume(queue=’B’, on_message_callback=callbackFunctionForQueueB, auto_ack=True)
channel.basic_consume(queue=’C’, on_message_callback=callbackFunctionForQueueC, auto_ack=True)
#Starting Threads for different channels to start consuming enqueued requests
Thread(target= channel.start_consuming()).start()
while True:
try:
startConsumer()
except:
continue

Conclusion

This blog was meant to be the minimal code required to get started with python and we can develop on top of it to harness more advanced features and utilize it for building a great project.

For further reading, pika has great documentation that we can read:- https://pika.readthedocs.io/en/stable/

--

--