Rabbitmq Thread Model

Ryan Zheng
The Startup
Published in
5 min readAug 18, 2020

This article is based on some basic understanding of Rabbitmq(Queue,Consumer,Producer etc). When communicating with Rabbitmq server, a connection is required. The design of the connection in any framework is more or less the same. The Connection has to wrap a TCP/UDP Socket connection in one way or another. In amqp-client, SocketFrameHandler is used to wrap a TCP Socket.

SocketFrameHandler provides the functionality of reading from and writing to the socket.

AQMPConnection class represents the client-side connection used to communicate with the Server. _frameHandler is one attribute inside this class. This is equal to that one Connection is wrapping around one TCP socket.

In Rabbitmq, Frame is a basic message unit in AMQP protocol. A Channel is used to send and receive frames. Channel is just a logical concept, each channel has one Id. Multiple channels can be created on the connection. This id of the channel is set on the Frame before sent over the connection.

from https://www.brianstorti.com/speaking-rabbit-amqps-frame-structure/

Since Channel is used to send and receive the frames, but eventually frame has to be sent/received over the connection(TCP Socket). So each channel actually contains the connection as one attribute.

When a channel is sending a Frame, it has to use the connection. Now imagine a situation, two channels are created for the same connection, and two different threads are used to handle each channel.

two channels on the same connection send messages in two threads

Channel A sends a frame. Channel B also sends a frame. Since both channels are using the same connection(TCP socket). Two frames can not be sent at the same time. They have to be serialized. Otherwise, the message will be corrupted. So in the writeFrame function of SocketFrameHandler, the DataOutputStream is synchronized. The same goes for DataInputStream)

SocketFrameHandler.javapublic void writeFrame(Frame frame) throws IOException {
synchronized(this._outputStream) {
frame.writeTo(this._outputStream);
}
}

In order to communicate with the server, A connection has to be created. ConnectionFactory can be used to create a connection.

FrameHandlerFactory fhFactory = ConnectionFactory.createFrameHandlerFactory();//create FrameHandler
FrameHandler handler = fhFactory.create(addr, clientProvidedName);
//create Connection
AMQConnection conn = this.createConnection(params, handler, this.metricsCollector)

We know that TCP socket actually exists inside the FrameHandler. FrameHandler is one attribute in the AMQConnection. So in order to construct the AMQConnection, FrameHandler should be created first.

After an AMQConnection is created, we can call start() function on the connection. start() function will start a separate thread running on this connection to wait for incoming frames on the socket. This thread is normally called IO thread.

IO Thread readFrame

As said earlier, Channel is used to handle send and receive frames. In the frame header, there is a channel#id field. So after SocketFrameHandler reads the frame from the socket, it will deliver the frame to the correct channel on this connection. Channel has a method handleFrame(frame).

Now imagine that we start 3 connections. Then there will be three forked threads which are reading frames from each socket.

In AMQP protocol, a complete command is composed of multiple frames. Each frame contains only a part of one AMQCommand. Channel will use AMQCommand class to process the received frames, compose a Command, and then dispatch to a thread pool to consume the message. CommandAssembler is the class that is assembling frames into a Command.

frame assembler

In Rabbitmq, one connection could contain multiple channels. There is no direct relationship between channel and queue. Multiple consumers could be configured for the same queue.

channel, connection,exchange,queue,consumers

In the above setup, connection#1 has two channels. Both channels are used for reading and writing messages. In Rabbitmq, Channel is not thread-safe. So it’s better to use different threads to handle different channels.

Consider the following case:

If we called channel#0.basicConsume(“queueB”) from connection#2. Does Rabbitmq deliver the message to consumer#3 of queueB also using connection#2? Is it possible that the Rabbitmq uses a different connection such as connection#1 to deliver the message to consumer#3?

Answer: Rabbitmq will use the connection#2 to deliver the messages. It will not use connection#1. Because channels are bounded to connections. When the connection receives the frame, AMQConnection.readFrame will find the channel which has the same channel-id as the one contained in the frame header. Channels for different connections might have different channel ids. If the frame is received in a different connection, then AMQConnection.readFrame of connection#1 may not be able to find the channel with the same channel-id.

workflow for starting a connection

The above is the workflow after creating and starting a connection from the amqp-client.jar. Every connection maintains ConsumerWorkService thread pool. This thread pool is used to handle the tasks inserted by IO thread on the connection. Each task is using the Consumer handleDelivery to handle the message. Users normally has to implement the handleDelivery themselves.

Lastly, Rabbitmq recommends

  • Use one connection per process. Or one connection for publishing, one connection for consuming
  • Use one channel per thread
  • Don’t use too many queues. Rabbitmq uses a different thread to handle each queue. Otherwise, it will introduce too many context switching.

If anything wrong, please leave a comment.

--

--

Ryan Zheng
The Startup

I am a software developer who is keen to know how things work