Deep Dive into RabbitMQ with Spring Boot — Part 2: Retry & Parallel Processing

Sonali Mendis
8 min readApr 25, 2023

--

This is the final post of 2 part series on RabbitMQ with Spring Boot. In the previous post, I explained the spring version of RabbitMQ acknowledgement mode (Part 1). In this post, I’m hoping to explain how to tweak your RabbitMQ configuration to alter the retry behavior, and how to add multiple consumers to allow parallel processing, in your Spring Boot application.

This article is not for you if you are only interested in getting a basic RabbitMQ publisher/consumer pattern to work in your Spring Boot application.

Altering Retry Behavior

In spring boot default configuration for RabbitMQ, if a consumer throws an exception while processing the message, it will republish the message to the queue over and over again forever. It looks as follows in the management console.

This is not what we want in most of the cases. What we would generally expect is to retry only a predefined number of times to consume the message, if not publish it to the dead letter queue. It can be achieved fairly easily by adding the following configuration to your application.yml file if your message is just a simple String or is of a primitive data type. We will discuss later how to handle if your message is a complex object in this post.

spring:
rabbitmq:
host: localhost
port: 5672
listener:
simple:
retry:
enabled: true
initial-interval: 3s
max-attempts: 3
max-interval: 10s
multiplier: 2

If an exception thrown while consuming the message, it will be republished to the queue only 2 more times (making it total 3 attempts) and then will be sent to the dead letter queue. 2nd attempt will be after 3 seconds from the 1st failed attempt, because the initial interval is set to 3. The 3rd attempt will be after 3 x 2 = 6 seconds, where 3 is the current wait and 2 is the multiplier. Then the message will be sent to the dead letter queue. If max-attempts count was 4 or more consequent attempts will occur 10 seconds apart as 6 x 2 = 12 is greater than 10 where 6 is the current wait, 2 is the multiplier but the resulting 12 is greater than the max interval defined as 10 seconds which will be the wait after the 3rd attempt.

This will be shown in the management console as follows,

What if we don’t want to send the message to the dead letter queue after 3 attempts and wish to publish it to some other queue conditionally after 3 retries. To implement this, we need to set the retry configuration in code instead in the application.yml as shown below.

spring:
rabbitmq:
host: localhost
port: 5672
app:
rabbitmq:
direct-exchange: my.direct
request-queue: my.direct.request
response-queue: my.direct.response
retry-attempts: 3
backoff-interval: 1000
backoff-multiplier: 2
backoff-max-interval: 5000

Let’s configure a simple rabbit listener in RabbitMqConfig.java file with retry interceptor as below. I have added a Jackson2JsonMessageConverter and configured it to be used in the rabbit listener because I’m expecting to receiving a complex message of type MyMessage from the queue.

Jackson2JsonMessageConverter messageConverter(ObjectMapper mapper){
var converter = new Jackson2JsonMessageConverter(mapper);
converter.setCreateMessageIds(true); //create a unique message id for every message
return converter;
}

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
RetryOperationsInterceptor retryInterceptor,
ObjectMapper objectMapper) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(messageConverter(objectMapper));
factory.setAdviceChain(retryInterceptor);
return factory;
}

@Bean
public RetryOperationsInterceptor messageRetryInterceptor(){
return RetryInterceptorBuilder.StatelessRetryInterceptorBuilder
.stateless()
.maxAttempts(properties.getRetryAttempts())
.backOffOptions(
properties.getBackoffInterval(),
properties.getBackoffMultiplier(),
properties.getBackoffMaxInterval()
)
.recoverer(new RejectAndDontRequeueRecoverer())
.build();
}

Properties values will be read from application.yml with the prefix app.rabbitmq.

The message retry interceptor can be either stateful or stateless. See the spring boot documentation for retry (here) to determine whether you need a stateless or stateful interceptor. In short, if you have transactions or database updates that needs to be reverted back if the code fails, you will need a stateful retry interceptor to avoid data inconsistencies. If you were to use a stateful retry interceptor your rabbitListenerContainerFactory and messageRetryInterceptor methods will change as follows,

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
StatefulRetryOperationsInterceptor retryInterceptor,
ObjectMapper objectMapper) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(messageConverter(objectMapper));
factory.setAdviceChain(retryInterceptor);
return factory;
}

@Bean
public StatefulRetryOperationsInterceptor messageRetryInterceptor(){
return RetryInterceptorBuilder.StatefulRetryInterceptorBuilder
.stateful()
.maxAttempts(properties.getRetryAttempts())
.backOffOptions(
properties.getBackoffInterval(),
properties.getBackoffMultiplier(),
properties.getBackoffMaxInterval()
)
.recoverer(new RejectAndDontRequeueRecoverer())
.build();
}

In the retry interceptor we specify it needs to be retried 3 times max, set of backoff options namely, initial interval, multiplier and max interval. If exhausted with all retry attempts then the recoverer of the retry interceptor will be called. To this method we need to pass an implementation of a MessageRecoverer interface.

RejectAndDontRequeueRecoverer class which implements the MessageRecoverer interface will send the message to the dead letter queue by throwing an AmpqRejectAndDontRequeueException in it’s recover method.

RepublishMessageRecoverer class which implements the MessageRecoverer interface will publish the message to a given exchange.

To provide a custom implementation having both of these features conditionally, we will implement ErrorMessageResolver which implements the MessageRecoverer interface.

Update your messageRetryInterceptor method as follows to accept instance of ErrorMessageResolver class in recoverer as follows. Here I’m using a stateless retry interceptor but can be done with a stateful retry interceptor in the same manner. I’m adding a bean definition for rabbit template in the config as it will be needed in the ErrorMessageResolver class as we are dealing with complex messages.

@Bean
public RetryOperationsInterceptor messageRetryInterceptor(
MessageRecoverer messageRecoverer){
return RetryInterceptorBuilder.StatelessRetryInterceptorBuilder
.stateless()
.maxAttempts(properties.getRetryAttempts())
.backOffOptions(
properties.getBackoffInterval(),
properties.getBackoffMultiplier(),
properties.getBackoffMaxInterval()
)
.recoverer(messageRecoverer)
.build();
}

@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate template, AppProperties properties, ObjectMapper objectMapper){
return new ErrorMessageResolver(
template,
properties,
messageConverter(objectMapper)
);
}

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory factory, ObjectMapper objectMapper){
RabbitTemplate template = new RabbitTemplate();
template.setConnectionFactory(factory);
template.setMessageConverter(messageConverter(objectMapper));
return template;
}

Our ErrorMessageResolver class will be implementing the recover method from the MessageRecoverer interface. It is important to note that every exception thrown by the listener method is wrapped by a ListenerExecutionFailedException. We have defined a custom exception as FailedProcessException. If the listener method throws an exception of type FailedProcessException it will publish the message to the response queue, and for all other types of exceptions it will publish the message to the dead letter queue.

public class ErrorMessageResolver implements MessageRecoverer {
private final RabbitTemplate template;
private final AppProperties properties;
private final Jackson2JsonMessageConverter converter;

public ErrorMessageResolver(RabbitTemplate rabbitTemplate,
AppProperties properties,
Jackson2JsonMessageConverter converter) {
this.template = rabbitTemplate;
this.properties = properties;
this.converter = converter;
}

@Override
public void recover(Message message, Throwable cause){
if(cause instanceof ListenerExecutionFailedException &&
cause.getCause() instanceof FailedProcessException){
try {
//retrieve original message
message.getMessageProperties().setInferredArgumentType(MyMessage.class);
MyMessage originalRequest = (MyMessage) converter.fromMessage(message, MyMessage.class);

FailedMessage failedMessage = new FailedMessage(
originalRequest.getMessageId(),
originalRequest.getMessage(),
cause.getCause().getMessage()
);
//send the message to response queue
this.template.convertAndSend(
properties.getDirectExchange(),
properties.getResponseQueue(),
failedMessage
);

} catch (Exception ex){
//send the message to dead letter queue
throw new AmqpRejectAndDontRequeueException("Unable to recover message", ex);
}
} else {
//send the message to dead letter queue
throw new AmqpRejectAndDontRequeueException("Unable to recover message");
}
}
}

Let’s try sending a message that throws a FailedProcessException when get executed by the listener method. This is what you will see in the management console.

Let’s empty the response queue for clarity and try sending a message that throws a exception of type Exception when get executed by the listener method. This is what you will see in the management console.

This is what we wanted to achieve by altering the retry behavior with retry interceptors and message recoverers.

Allow parallel consuming of messages

You may be able to see that if you are using the default configuration for concurrent consumers you get only 1 consumer per queue with a message prefetch count of 250. This means that 1 consumer you have for the queue will fetch up to 250 messages at once and do sequential processing.

So if we send 2 messages both messages get delivered to the same consumer and while it process the first message the second message is stored on in memory of the consumer. This is not the desired behavior in general. We would like to have multiple consumers to handle messages and ideally will not want them to accept more than 1 message at once.

We can achieve this simply by configuring the application.yml given that you are not creating your own rabbit listener factory in the code.

spring:
rabbitmq:
host: localhost
port: 5672
listener:
simple:
concurrency: 5
max-concurrency: 10

Your management console will now display 5 concurrent consumers.

If you have defined your own rabbit listener factory in the configuration, your application.yml will be fairly simple as follows,

spring:
rabbitmq:
host: localhost
port: 5672

But will require to add the related configuration inthe rabbit listener factory in RabbitMqConfig.java class.

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory connectionFactory,
RetryOperationsInterceptor retryInterceptor,
ObjectMapper objectMapper) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(messageConverter(objectMapper));
factory.setPrefetchCount(0);
factory.setConcurrentConsumers(properties.getConcurrentConsumers());
factory.setMaxConcurrentConsumers(properties.getMaxConcurrentConsumers());
return factory;
}

Setting the prefetch count to zero will make sure the consumers will not accept more than 1 message at a time. Concurrent consumers will specify the number of consumers you will initially create and max concurrent consumers will set the maximum number of parallel consumers the system can initiate for you.

Please note that if you do not set the prefetch count to zero, even though you have specified multiple consumers, 1 consumer will greedily accept up to 251 messages at once and sequentially process them one after the other.

Complete code for RabbitMQ producer can be found here.

Complete code for RabbitMQ consumer with retry and concurrency implementations can be found here

--

--

Sonali Mendis

I am a senior developer from Scott Logic. I am a fullstack developer with experience in web, mobile, backend and cloud technologies