Cross-region RMQ message migration using Shovel Plugin

Ujjwal Gupta
SquadStack Engineering
4 min readMay 24, 2022
Cross-region RMQ message migration using Shovel Plugin

This blog details Squadstack’s experience with moving RMQ messages from one AWS region to another.

Brief

Due to data compliance reasons, we at Squadstack migrated our existing infrastructure from one AWS region to another. RMQ migration was one of the major aspects of the infrastructure move that we had to deal with.

The approach we followed was a traditional one:

  1. Created a new private RMQ cluster in a new region.
  2. Point all-new message requests to the new RMQ cluster.
  3. Stop request ingestion to the old RMQ cluster.
  4. Old RMQ was only responsible to deliver the old messages (Unacked + Ready).
  5. Stop the old RMQ cluster once the remaining message count becomes 0.

Looks Simple, Right? But it’s not simple as it seems.

Problems that we faced by using the above traditional approach

  1. Infinite Queued tasks — Some tasks were re-queuing to the same queue again & again. This becomes a deadlock for us.
  2. High Latency — As part of infra migration we already migrated our RDS to the Mumbai region. So, while executing tasks from Singapore’s RMQ cluster the query is getting a timeout.
  3. Extra Cost Overhead — Running multiple RMQ clusters one in the Mumbai region (for primary) and another in the Singapore region. We also placed dedicated celery servers for both of the clusters. This becomes overhead to our Infra budget.

So, How do we overcome these problems 🤔? And the answer is…

Shovel Plugin as a Message Transferer

Shovel plugin is a core RMQ plugin that unidirectionally moves messages from a source to a destination. It is necessary to reliably and continually move messages from a source (typically a queue) in one cluster to a destination (an exchange, topic, etc) in another cluster.

The source and destination can be in the same cluster (typically in different vhosts) or distinct ones. Shovels support AMQP 0.9.1 and AMQP 1.0 sources and destinations. The source and destination do not have to use the same protocol, so it is possible to move messages from an AMQP 1.0 broker to RMQ or vice versa.

You can read more about the shovel plugin and its benefits here.

Steps to enable shovel plugin for RMQ cluster

To enable the shovel plugin, we need to follow certain steps which as mentioned below:

Make your destination RMQ cluster publicly accessible.

  1. Create a server in a public subnet.
  2. Install Nginx using sudo apt-get install nginx
  3. Check whether nginx is installed successfullynginx -v
  4. Check for brokers’ private IPs.
  5. Configure TCP proxy using Nginx upstream at /etc/nginx/nginx.conf
stream {
# List of upstream AMQP connections
upstream stream_amqp {
least_conn;
server 10.0.0.24:5672; # Broker IP and AMQP default port
server 10.0.0.108:5672; # Broker IP and AMQP default port
server 10.0.0.124:5672; # Broker IP and AMQP default port
}
# AMQP definition
server {
listen 12345; # the port to listen on this server
proxy_pass stream_amqp; # forward traffic to upstream group
proxy_timeout 3s;
proxy_connect_timeout 3s;
}
}
File: nginx.conf

6. Reload Nginx: nginx -s reload

7. Allow inbound rules to RMQ Security group for port 5672 and Public Server IP as a Source.

8. Make sure the connection between publisher and consumer is successful.

  • install rabbitpy: pip install rabbitpy
  • Create a python script for testing the connection. Use public server IP while making connections to publisher and consumer.
import rabbitpydef publish(channel):
body = "Testing Connection..."
message = rabbitpy.Message(channel, body, {'content_type': 'text/plain'})
response = message.publish('rabbitpy-tests', 'ha-routing-key', mandatory=True)
if response:
print('Message delivered successfully')
else:
print('Message delivery failed')

# Created a new admin user "test" with password "test"
# The below is the IP of the proxy
# url_pattern: "amqp://username:password@public_host_addr:AMQP_port+vhost"
if __name__ == '__main__':
url = "amqp://test:test@10.0.100.198:12345/%2f" # RMQ Conn Str
try:
with rabbitpy.Connection(url) as connection:
with connection.channel() as channel:
# enabled publisher confirms
channel.enable_publisher_confirms()
publish(channel)
except rabbitpy.exceptions.MessageReturnedException as ex:
print "Failed to publish %s" % ex
except KeyboardInterrupt:
print('exiting...')

File: publisher_test.py

Output

$ python publisher_test.py
Message delivered successfully

Make Shovel Connection using RMQ Management Portal

  1. Go to your Source RMQ management dashboard and logged in as an admin user.
  2. Select the Admin tab and go into Shovel Management settings.
  3. Add a new Shovel with config:
NAME: Test Shovel Plugin
SOURCE: AMQP 0.9.1
URI: amqp://user:password@Source_address:5672/%2f
QUEUE: test_shovel_plugin_sour # Source Queue name that you want to migrate
DESTINATION: AMQP 0.9.1
URI: amqp://admin_user:admin_password@destination_address:12345/%2f
QUEUE: test_shovel_plugin_dest # Destination Queue name where you want to migrate

Shovel Config
Shovel Config

4. Check Shovel Status and make sure that the connection is successful.

5. The last step is to remove all the workers from the source queue and make sure that the consumer count is only 1.

6. Tadda message migration completed 👏🏻

Extra Points to look for

  1. Make sure no other consumer is connected to the source queue while migrating. Otherwise, it won’t migrate messages to the destination queue.
  2. Make sure you revoke public access post message migration.
  3. Pass patterns while providing the source queue name to the shovel configuration. Example: test_*
  4. Make sure you append %2f at the end of the URI to the shovel configuration. The vhost is a single segment. Thus, any “/” characters that appear in the vhost name must be percent-encoded. Example: amqp://test:test@dest_add:5672/%2f

Thanks for reading this article. If you enjoyed this article, please give a few claps so that it reaches more people who could benefit from it!

--

--