Direct reply-to in RabbitMQ

I’m a new guy to RabbitMQ and in a process of converting a system to use message-based architecture. The main motivation to make the system to work in distributed environment easier. The first task that I need to convert is replacing a few inter-connected components that rely on REST for communication. That sounds pretty common to everyone.

What I have been doing is follow the tutorial from here: for each client create an exclusive queue during the lifetime of the client like the below picture:

Image Source: Rabbitmq.com

It works well for me, until I find the better solution.

It turns out that RabbitMQ have native support for request and response pattern. As a client, you will need to:

  • Consume a special queue name amq.rabbitmq.reply-to in no-ack mode. If you consume the queue without no-ack mode, it will just fail.
  • When you send a message, set the property reply-to to amq.rabbitmq.reply-to

When RabbitMQ sends the message to the server, it will magically change the reply-to property to a random value. The server can reply the message back to the client by sending a message to the default exchange with the routing key of the reply-to property.

A simplified version in Kombu is as below:

Server-side

from kombu import Connection, Exchange, Producer, Queue, Consumer
conn = Connection("amqp://localhost:5672")
exchange = Exchange("rpc", type="direct")
request_queue = Queue(name="rpc", exchange=exchange, routing_key="request")
request_queue.maybe_bind(conn)
request_queue.declare()
def process_message(body, message):
print("Request: %s" % body)
message.ack()
    # send reply to client
producer = Producer(channel=conn, routing_key=message.properties['reply_to'])
producer.publish("result")
with Consumer(conn, request_queue, callbacks=[process_message]):
conn.drain_events()

Client-side

from kombu import Connection, Exchange, Producer, Queue, Consumer
conn = Connection("amqp://localhost:5672")
exchange = Exchange("rpc", type="direct")
def process_message(body, message):
print("Response: %s" % body)
message.ack()
reply_queue = Queue(name="amq.rabbitmq.reply-to")
with Consumer(conn, reply_queue, callbacks=[process_message], no_ack=True):
producer = Producer(exchange=exchange, channel=conn, routing_key="request")
properties = {
"reply_to": "amq.rabbitmq.reply-to",
}
    producer.publish("Hello World", **properties)
conn.drain_events()

That’s it. No temporary queue. One side note is that the reply-to is unique per connection, so if you make multiple calls per client so if you need to call multiple times you will need to have a request id to keep track of the calls. But in overall, it looks much cleaner to me. And I learn something new 😀.

Like what you read? Give Bao Nguyen a round of applause.

From a quick cheer to a standing ovation, clap to show how much you enjoyed this story.