Live Streaming with RabbitMQ & SpringBoot Over Low or Intermittent Network

Fatih Işık
6 min readOct 4, 2019

Industrial Internet of Things applications like Oversea gas/oil platforms or Coastal surveillance applications has tendency ever-more distributed enterprises. Remote stations are generally connected to the master node via satellite link or low bandwidth, and its data is synchronized at regular intervals to the master node.

Offshore Oil Platform

Mission critical measurements need to be transmitted as a live-stream, in another word receiving most recent measurements is essential for the master node. But, unreliable connectivity or limited bandwidth introduces noticeable delays. For example, current speed and location of detected an aircraft or a temperature of a drill is more important then 5 minutes before. In order to overcome this problem, pending messages on network should be dropped. This causes lost of storing historical measurement at master which is also required for replay scenario and machine learning.

Lets visualize the problem, imagine remote station has connection up to 1 Mbyte\s and produces 1024 measurements in a second with size of a message approximately 2 Kbyte which means station produces 2 Mbyte\s. After 10 minutes starting data transfer to the master node, master would receive messages that produced 5 minutes before by remote station. Fragmentation and TCP headers will make this scenario more scary.

So how we can send most recent measurements to master on condition on low or intermittent bandwidth without losing and also without duplicating network. At first using any queuing solution seems not an option. But after considering AMQP/RabbitMQ following features it is not any hassle.

  • Time-to-live (TTL)
  • Dead-letter-queue(DLQ)
  • RabbitMQ shovel plugin

Time To Live — TTL

A message that has been in the queue for longer than the configured TTL is said to be dead. In our scenario, this feature is used to drop pending message that has been not transferred yet. Specify the optional x-message-ttl argument to set time-out in milliseconds.

Dead Letter Queue — DLQ

A messages in a queue is republished to an exchange when any of the following events occur:

In our scenario, TTL events drop pending messages to the DLQ without losing any message. DLQ is a kind of a queue that holds dead messages. To set the dead letter for a queue, specify the optional x-dead-letter-exchange, x-dead-letter-routing-key arguments when declaring the queue.

RabbitMQ Shovel Plugin

RabbitMQ Shovel plugin acts as a consumer for a queue and delivers source messages to the configured destination queue. Configurations are

  • Local RabbitMQ urls , credentials, source queue
  • Remote RabbitMQ urls , credentials, destination queue

Shovel can be easily defined dynamically with using HTTP api. After shovel configured, it can be checked following {url}/#/dynamic-shovels . If shovel being disconnected waits as long as given reconnect-delay configuration before reconnecting to the brokers at either end. For live and dead queue , shovel is the only consumer in our scenario.

Lets elaborate remote live queue, this queue is responsible to hold live messages, if a message has not been consumed after a period of time it should be dropped and directed to dead letter queue. As a result, remote application will have two message queues, only one of them will be transferred as a live message. Producer should send only to the live queue.

System Configuration

Master and remote applications must have their own RabbitMQ which means remote application will be connected to locally installed RabbitMQ and it will send messages to local queue. Management plugin should be enabled to observe status of each queue. RabbitMQ Shovel plugin must be enabled on remote station installation, check for instructions. https://www.rabbitmq.com/shovel.html

System Overview

Implementation of Remote Application

Add following Spring Boot dependency to use RabbitMQ features.

<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>${parent.version}</version>
</dependency>

Queue definitions are below

@Autowired
private RabbitProperties props;

@Autowired
private AppProperties appProperties;

@Autowired
private AmqpAdmin amqpAdmin;
@Bean
public Queue outgoingQueue() {
Map<String, Object> args = new HashMap<>();
// The default exchange
args.put("x-dead-letter-exchange", "");
// Route to the dead letter queue when the TTL occurs
args.put("x-dead-letter-routing-key", Constants.DEAD_LETTER_QUEUE);
// TTL 10 seconds
args.put("x-message-ttl", 10000);
args.put("x-max-priority", 10);
Queue queue = QueueBuilder.nonDurable(Constants.LIVE_QUEUE)
.withArguments(args).build();
amqpAdmin.declareQueue(queue);

RabbitMqUtil.createShovel(props, appProperties,
queue.getName(), Constants.MASTER_LIVE_QUEUE);
return queue;
}

@Bean
Binding binding() {
return BindingBuilder.bind(outgoingQueue()).to(exchange())
.with(Constants.ROUTING_KEY);
}

@Bean
DirectExchange exchange() {
return new DirectExchange(Constants.EXCHANGE);
}

/**
* dead queue definition
*
@return
*/
@Bean
public Queue deadLetterQueue() {
Map<String, Object> args = new HashMap<>();
Queue queue = QueueBuilder.durable(Constants.DEAD_LETTER_QUEUE)
.withArguments(args).build();
RabbitMqUtil.createShovel(props, appProperties,
queue.getName(), Constants.MASTER_DEAD_LETTER_QUEUE);
return queue;
}

Dead letter queue must be durable with no additional features. It holds time-out messages. Live data stream measurements, decided by design, will be sent to local live queue. Check queue definitions on {url}/#/queues .

Queue definitions features on Remote Station
Configured shovel

Test Scenario

A test application is created as a remote application. Network limiter is installed on master node in order to simulate low bandwidth. After starting application then call http://localhost:8080/api/producer/start . Producer sends messages to live queue.

rabbitTemplate.convertAndSend(Constants.EXCHANGE, Constants.ROUTING_KEY, data);

After a period of time messages on the live queue is dropped to the DLQ due to low bandwidth.

Then stop is called http://localhost:8080/api/producer/stop and cancel the network limiter to finish

Master Queue After Test Stopped
Test Logs

Total produced messages are 15592 on remote application, 14183 of them dropped to the DLQ. Also check master live queue, sequential messages have gaps at index number and this index numbers in between are in DLQ. In test application, a shovel is configured to transfer in parallel. Shovel has no information about network bandwidth so shovel for DLQ is unable to decide when to active itself. On the other hand, remote application might have statistics for total message produced in a time and using this information application can decide when to create or delete shovel for DLQ.

Message Details

--

--