RabbitMQ Spring Boot #03 — Exchange configuration

Aleksander Kołata
7 min readOct 3, 2021

--

Overview

Hi ! In this article we’ll take a look into RabbitMQ exchange configuration options.

RabbitMQ setup

The easiest way to start working with RabbitMQ is to run it inside of Docker container. In order to do that, simply run bellow command in Your terminal:

docker run -d --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3.9.5-management

If You want to read more about official Docker image customisation, here You can find it’s documentation.

Exchange creation using the management interface

RabbitMQ management interface

We will start the review of existing configuration options using RabbitMQ management interface. Please notice, that I used the Docker image with 3.9.5-management tag. Suffix -management is important, because such images have Management Plugin installed.

Exchange creation

1. Open the management interface — http://localhost:15672/ , username/password is guest/guest by default.

RabbitMQ management panel

2. Open Exchanges tab

RabbitMQ exchanges tab in management interface

3. Try to add a new exchange — only one field is required — the exchange name. I typed x.some-exchange ,clicked Add exchange , and voilà ! Exchange created !

Created exchange is now visible

Although only the name was required, there were other options to modify. What do they mean?

Exchange configuration options

Name

Name of the exchange.

Type

Type of the exchange. You can choose from the following types:

  • Direct — delivers messages to queues based on a message routing key. Binding key must exactly match routing key
  • Fanout — routes the message to all of the queues bound to the exchange. Routing key is ignored
  • Headers — uses the messages headers for routing
  • Topic — performs a wildcard match between the routing key and the routing pattern, specified in the binding

If You would like to read more about the exchange types, I wrote an article about them lately — https://medium.com/@aleksanderkolata/rabbitmq-spring-boot-02-exchanges-479516e34142 . You can check it out :)

Durability

  • Durable — exchange survives restarts, and last until it will be explicitly deleted
  • Transient — exists until RabbitMQ is shut down

Auto delete

Auto-deleted exchanges are removed when the last bound object will be unbounded.

Internal

Clients cannot publish directly to internal exchange. It can be used only with exchange to exchange binding.

Arguments

  • alternate-exchange=<exchange-name> — if a message send to this exchange cannot be routed, it will be send to provided alternate exchange (You will see an example later in the demo)
  • … You can also provide Your own arguments, which value can be a String, Number, Boolean or a List

Spring Boot Exchange configuration

Abstract Exchange

Class org.springframework.amqp.core.AbstractExchange in Spring AMQP has 5 implementations:

  • CustomExchange
  • DirectExchange
  • FanoutExchange
  • TopicExchange
  • HeadersExchange

These classes have 3 public constructors available:

public AbstractExchange(String name) {
this(name, true, false);
}

public AbstractExchange(String name, boolean durable, boolean autoDelete) {
this(name, durable, autoDelete, null);
}

public AbstractExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
super(arguments);
this.name = name;
this.durable = durable;
this.autoDelete = autoDelete;
}

This means, that You can set following exchange properties while creating a new …Exchange object in Spring configuration:

  • name — via constructor
  • type — by using a selected AbstractExchange implementation
  • durability — via constructor
  • auto delete —via constructor
  • arguments — via constructor

If You want to configure additional options (e.g. mark an exchange as internal), You can do it by calling set...(...) on already created object.

You can also add additional arguments:

exchange.addArgument("key", "value");

Example configuration

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Map;

@Configuration
public class RabbitMqConfig {
public static final String ALTERNATE_EXCHANGE_NAME = "x.alternate";
public static final String ALTERNATE_EXCHANGE_QUEUE = "q.alternate";

private static final boolean DURABLE = true;
private static final boolean TRANSIENT = false;
private static final boolean AUTO_DELETED = true;
private static final boolean MANUALLY_DELETED = false;
private static final Map<String, Object> EMPTY_ARGUMENTS = Map.of();

@Bean
public Declarables declareExchanges() {
return new Declarables(
new DirectExchange("x.direct", DURABLE, AUTO_DELETED, Map.of("alternate-exchange", ALTERNATE_EXCHANGE_NAME)),
new FanoutExchange("x.fanout", DURABLE, AUTO_DELETED, EMPTY_ARGUMENTS),
new TopicExchange("x.topic", TRANSIENT, MANUALLY_DELETED),
new HeadersExchange("x.headers", TRANSIENT, MANUALLY_DELETED, EMPTY_ARGUMENTS)
);
}

@Bean
public Declarables alternateExchange() {
FanoutExchange alternateExchange = new FanoutExchange(ALTERNATE_EXCHANGE_NAME, true, false);
Queue alternateQueue = new Queue(ALTERNATE_EXCHANGE_QUEUE);
return new Declarables(
alternateExchange,
alternateQueue,
BindingBuilder.bind(alternateQueue).to(alternateExchange)
);
}

@Bean
public Declarables internalExchange() {
FanoutExchange exchange = new FanoutExchange("x.internal");
exchange.setInternal(true);
exchange.setShouldDeclare(true);
exchange.setIgnoreDeclarationExceptions(true);
exchange.setDelayed(false);
exchange.addArgument("key", "value");

Queue queue = new Queue("q.internal");
return new Declarables(
exchange,
queue,
BindingBuilder.bind(queue).to(exchange)
);
}

}
  • Different exchanges are being created. Only the exchange name is a required parameter
  • exchange x.direct has an additional argument alternate-exchange configured. Not matching messages send to this exchange should be routed to the exchange x.alternate
  • alternate exchange x.alternate was created, and queue q.alternate is bounded to this exchange
  • more advanced exchange configuration is presented in internalExchange() method

Alternative exchange declaration method

Alternatively You can declare an exchange directly in the @RabbitListener annotation.

@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "q.listener-declaration"),
exchange = @Exchange(
name = "x.listener-declaration",
type = ExchangeTypes.DIRECT,
durable = "false",
autoDelete = "true",
arguments = {},
internal = "false",
ignoreDeclarationExceptions = "true"),
key = "my-key")
)
public void example(Message message) {
log.info("Received [{}]", message);
}

Demo

In the demo we will see, whether the exchanges will be visible in RabbitMQ management interface, and if alternate exchange works.

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import static pl.akolata.config.RabbitMqConfig.ALTERNATE_EXCHANGE_QUEUE;

@Slf4j
@SpringBootApplication
@RequiredArgsConstructor
public class Application implements CommandLineRunner {
private final RabbitTemplate rabbitTemplate;

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@RabbitListener(queues = ALTERNATE_EXCHANGE_QUEUE)
public void listen(Message message) {
log.info("Queue [{}] received the message [{}]", ALTERNATE_EXCHANGE_QUEUE, message);
}
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "q.listener-declaration"),
exchange = @Exchange(
name = "x.listener-declaration",
type = ExchangeTypes.DIRECT,
durable = "false",
autoDelete = "true",
arguments = {},
internal = "false",
ignoreDeclarationExceptions = "true"),
key = "my-key")
)
public void example(Message message) {
log.info("Received [{}]", message);
}
@Override
public void run(String... args) {
rabbitTemplate.convertAndSend("x.direct", "key-not-bound", "MSG");
}
}
  • application sends a message to the x.direct exchange. This exchange doesn’t have any bindings, so the routing key key-not-bound won’t be matched. Such message should be routed to the exchange x.alternate , based on argument alternate-exchange=x.alternate passed during exchange creation
  • application listens on q.alternate queue, which is bound to the x.alternate exchange. It should receive the message send to x.direct exchange

Result:

.   ____          _            __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.5.5)
2021-10-03 09:22:19.613 INFO 2193 --- [ main] pl.akolata.Application : Starting Application using Java 11.0.4 on MacBook-Pro-Aleksander.local with PID 2193 (/Users/aleksanderkolata/IdeaProjects/spring-boot-rabbitmq/03-spring-boot-rabbitmq-exchange-config/target/classes started by aleksanderkolata in /Users/aleksanderkolata/IdeaProjects/spring-boot-rabbitmq/03-spring-boot-rabbitmq-exchange-config)
2021-10-03 09:22:19.615 INFO 2193 --- [ main] pl.akolata.Application : No active profile set, falling back to default profiles: default
2021-10-03 09:22:20.456 INFO 2193 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2021-10-03 09:22:20.481 INFO 2193 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#57d0fc89:0/SimpleConnection@350a94ce [delegate=amqp://guest@127.0.0.1:5672/, localPort= 51104]
2021-10-03 09:22:20.483 INFO 2193 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable or auto-delete Exchange (x.listener-declaration) durable:false, auto-delete:true. It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.
2021-10-03 09:22:20.484 INFO 2193 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable or auto-delete Exchange (x.direct) durable:true, auto-delete:true. It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.
2021-10-03 09:22:20.484 INFO 2193 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable or auto-delete Exchange (x.fanout) durable:true, auto-delete:true. It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.
2021-10-03 09:22:20.484 INFO 2193 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable or auto-delete Exchange (x.topic) durable:false, auto-delete:false. It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.
2021-10-03 09:22:20.484 INFO 2193 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable or auto-delete Exchange (x.headers) durable:false, auto-delete:false. It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.
2021-10-03 09:22:20.572 INFO 2193 --- [ main] pl.akolata.Application : Started Application in 1.305 seconds (JVM running for 2.124)
2021-10-03 09:22:20.590 INFO 2193 --- [ntContainer#1-1] pl.akolata.Application : Queue [q.alternate] received the message [(Body:'MSG' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=x.direct, receivedRoutingKey=key-not-bound, deliveryTag=1, consumerTag=amq.ctag-HMcTyDCwc7AGXWCUrBR9HA, consumerQueue=q.alternate])]
  • at the application startup we can see logs telling us, that exchanges x.direct, x.fanout, x.topic and x.headers were declared
  • the message send to the x.direct exchange with a not matching routing key was indeed routed to x.alternate exchange !

Exchanges are also visible in the management interface:

Created exchanges are visible in the management interface

Summary

In this article I covered:

  • how to create an exchange in the RabbitMQ Management interface
  • what do exchange configuration options mean
  • how to configure RabbitMQ exchange using Spring configuration
  • how alternate-exchange works

Materials:

--

--

Aleksander Kołata

Senior Full Stack Developer — Java (Spring) and TypeScript (Angular). DDD and software architecture enthusiast.