RabbitMQ inside docker container

alex_ber
Geek Culture
Published in
7 min readDec 31, 2020

In the previous story I’ve described how to install RabbitMQ on Windows.

Here I will describe how you can get the RabbitMQ with management UI.

The easiest way is to use docker image rabbitmq:3.8.9-management-alpine (I prefer distribution based on Alpine Linux). The source code for Alpine Linux can be found here https://github.com/docker-library/rabbitmq/blob/888638927482f86af6e88bebb67423926cb1112f/3.8/alpine/management/Dockerfile

It doesn’t have ability to move messages from one Queue to another (plugins rabbitmq_shovel and rabbitmq_shovel_management are not installed). The code below fixes this:

I have change only line 4:

RUN rabbitmq-plugins enable --offline rabbitmq_management rabbitmq_shovel rabbitmq_shovel_management

I have added rabbitmq_shovel rabbitmq_shovel_management.

Let’s suppose you’ve built this docker image with name rabbitmq-i.

Now, you code will be typically run in another docker file. Best practice for establishing network connection between 2 docker containers is to create network. See https://www.middlewareinventory.com/blog/docker-connect-containers-together/ for more details.

docker network create rabbitnet

Now, start docker container based on rabbitmq-i docker image by:

docker run -d --hostname rabbitmq --name rabbitmq -p 15672:15672 -p 5672:5672 --network rabbitnet  rabbitmq-i

-d — start it in detached mode (in background, without attaching the console to the process’s standard input, standard output, and standard error).

--hostname — container’s host name. It is used inside RabbitMQ. For example, as part of the name of your RabbitMQ cluster

From the management UI

--name — name of the docker container. It will be used to communicate with this docker container from another docker container. It is better it to be the same as hostname.

-p 15672:15672 -p 5672:5672 5672 is exposed to connect to RabbitMQ and port 15672 is exposed for management UI. I’m using plain TCP, so I’m exposing port 15672. If you want to use TLS you should expose port 15671. Quote from documentation:

TLS Peer Verification: Who Do You Say You Are?

As mentioned in the Certificates and Keys section, TLS has two primary purposes: encrypting connection traffic and providing a way to verify that the peer can be trusted (e.g. signed by a trusted Certificate Authority) to mitigate against Man-in-the-Middle attacks, a class of attacks where an attacker impersonates a legitimate trusted peer (usually a server). This section will focus on the latter.

© https://www.rabbitmq.com/ssl.html

--network rabbitnet — this is crucial part of definition, we’re connecting rabbitmq-i to internal network (with name rabbitnet).

Code Example

Now, suppose that you have another docker container with pika==1.1.0 installed. Than you can have following blocking message producer:

When you’re running this docker container, you should add --network rabbitnet to docker run instruction in order to able to communicate with rabbitmq docker container (that is also sit on rabbitnet, see above).

Let’s go over the code:

In line 8— In tutorial’s URLParameters is often used. URLParameters and ConnectionParameters both extends Parameters class. BlockingConnection expects Parameters class to be passed to it’s __init__ method, so you can use both of them. I’m using all default value, except host. I’m using rabbitmq as host. Because this docker container is connected (by rabbitnet) to docker container based on rabbitmq-i image that has name rabbitmq, our TCP call will reach docker container with RabbitMQ.

Note: This same code will also work with RabbitMQ installed on Windows machine. You should only modify C:\Windows\System32\drivers\etc\hosts file. Add to it the following line:

127.0.0.1 rabbitmq

Now, you calls to “rabbitmq” hostname will reach your localhost (where you have RabbitMQ installed).

In line 10 — we’re creating simplest BlockingConnection with connection parameters above. See Comparing Message Publishing with BlockingConnection and SelectConnection, Connecting to RabbitMQ with Callback-Passing Style, Asynchronous publisher example for alternative approaches (using Pika). We use context manager to automatically close connection at the end.

In line 12 — we’re getting channel from connection above.

In line 15 (not in use) — we can create Queue programmatically. Personally, I prefer to do it in management UI, but you can do it in the code also. Quote from docstring:

Declare queue, create if needed. This method creates or checks a
queue. When creating a new queue the client can specify various
properties that control the durability of the queue and its contents, and the level of sharing for the queue.

Use an empty string as the queue name for the broker to auto-generate one. Retrieve this auto-generated queue name from the returned `spec.Queue.DeclareOk` method frame.
:param str queue: The queue name; if empty string, the broker will
create a unique queue name
:param bool passive: Only check to see if the queue exists and raise
`ChannelClosed` if it doesn't
:param bool durable: Survive reboots of the broker
:param bool exclusive: Only allow access by the current connection
:param bool auto_delete: Delete after consumer cancels or disconnects
:param dict arguments: Custom key/value arguments for the queue
:returns: Method frame from the Queue.Declare-ok response:rtype: `pika.frame.Method` having `method` attribute of type
`spec.Queue.DeclareOk`

© https://github.com/pika/pika/blob/master/pika/adapters/blocking_connection.py

In line 18 — we’re turning propriety mechanism for ensuring message delivery to the RabbitMQ. For more details see https://www.rabbitmq.com/confirms.html

In lines 20–25 — we’re actually sending message to the RabbitMQ. basic_publish method publishes a message to a specific exchange. The message will be routed to queues as defined by the exchange configuration and distributed to any active consumers when the transaction, if any, is committed.

Empty exchange name means that default exchange should be used.

routing_key — The routing key to bind on. For default exchange, queue name can be supplied, so the message will be sent to the appropriate queue. Here, we’re sending to the local queue.

body — The message body.

properties (unused) — BasicProperties of the message.

  • delivery_mode=1 means TRANSIENT_DELIVERY_MODE. This means that message will not be stored on disk and it will disappear after broker restarts.
  • delivery_mode=0 means PERSISTENT_DELIVERY_MODE. You should also declare queue as durable (see queue_declare above; you can do it also in management UI). In this case message will be stored on disk after broker restarts.

How Persistence Works

First, some background: both persistent and transient messages can be written to disk. Persistent messages will be written to disk as soon as they reach the queue, while transient messages will be written to disk only so that they can be evicted from memory while under memory pressure. Persistent messages are also kept in memory when possible and only evicted from memory under memory pressure. The “persistence layer” refers to the mechanism used to store messages of both types to disk.

https://www.rabbitmq.com/persistence-conf.html

mandatory — This flag tells the server how to react if the message cannot be routed to a queue. If this flag is set, the server will return an unroutable message with a Return method. If this flag is zero, the server silently drops the message.

See https://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish

Basically, this code uses push model: RabbitMQ “pushes” message on this code — every message from the local queue will be processed in on_message() callback.

Note: In PyCharm Ctrl+C doesn't work for me, I was unable to send SIGINT signal to interrupt channel.start_consuming() method. Killing the process leaves connection/channel opened in RabbitMQ. Be sure, to close it manually. It is better, to run this code from the CLI.

See https://pika.readthedocs.io/en/stable/examples/blocking_consume.html for details.

Basically, this code uses pull model: it “pulls” message from the RabbitMQ. This code has full control when it wants to stop processing the messages. It also has ability to know when the queue is empty.

Note: I use Twisted==20.3.0. You need to install some extra packages to make this code works.

See https://pika.readthedocs.io/en/stable/examples/blocking_consumer_generator.html

I will not go through every line, but only on interesting one.

In lines 74–76 we’re putting at least 2 message to the queue (in order for it to be not empty).

In lines 12–13 we have 2 flags — finished and finished_defered. This variables are used in communications between different threads (the details will be provided below).

Note: Python stores all objects on a heap, in main memory. So you don’t need to use volatile in Python. See https://stackoverflow.com/a/53780395/1137529 for details.

finished — queue was finished to be populated — producers put all items to the queue.

finished_defered — queue was finished to be processed — all items was consumed from the queue.

In lines 51–52

#installSignalHandlers
Thread(target=reactor.run, args=(False,)).start()

we’re starting Twisted reactor in the dedicated Thread.

In general, in Twisted application reactor should be started in the Main Thread. We’re not doing this in the main Thread we can’t install signal handers, so we’re passing False. This is a not-standard way for doing this, because I don’t want to write Twisted application here, but I want to use Twisted event-loop to consume items from the queue.

In line 60:

threads.deferToThread(defer_pika_queue_consume, channel)

this code runs defer_pika_queue_consume(channel) method in the dedicated Thread where Twisted event-loop that is running.

In lines 62–63 (main() function) we have

global finished
finished = True

here we modelling that producers has finish to fill the queue. We’re sending this message to on_message callback. Now, if the queue is empty he should stop queue consumption. By default, if the queue is empty it will wait for new messages to arrive.

In lines 65–66 (main() function) we have

while not finished_defered:
times.sleep(20)

main() function will wait until on_message() callback will send us signal that all items from queue was consumed and we can close the connection/channel to the queue.

Note:

  1. Here we’re using spin-lock implementation ofCountDownLatch.
  2. This implementation is not Mutex-based (not Lock/RLock-based). See https://stackoverflow.com/q/10236947/1137529 for the details.
  3. We can’t use connection.sleep() here because we have reactor event-loop in different thread.

In on_message() callback:

In line 32

for method_frame, properties, body in channel.consume(“local”, inactivity_timeout=10.0)

We’re passing inactivity_timeout in order to exit the call to consume() method after timeout in order to check the finished flag. If it is Trueand the queue is empty, we should finish consuming items from the queue.

In line 39–44 — after we get finished flag as True we’re using channel.get_waiting_message_count() call to check if we had processed the last item from the queue. This avoid waiting on the queue. The following code is also correct:

But this is less optimal: here we had to have at least one blocking call to break the loop.

In the provided example, if had received finished message before last item was processed, we can never block and break the loop.

--

--