RabbitMQ Spring Boot #03 — Exchange configuration
Overview
Hi ! In this article we’ll take a look into RabbitMQ exchange configuration options.
RabbitMQ setup
The easiest way to start working with RabbitMQ is to run it inside of Docker container. In order to do that, simply run bellow command in Your terminal:
docker run -d --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3.9.5-management
If You want to read more about official Docker image customisation, here You can find it’s documentation.
Exchange creation using the management interface
RabbitMQ management interface
We will start the review of existing configuration options using RabbitMQ management interface. Please notice, that I used the Docker image with 3.9.5-management
tag. Suffix -management
is important, because such images have Management Plugin installed.
Exchange creation
1. Open the management interface — http://localhost:15672/ , username/password
is guest/guest
by default.
2. Open Exchanges
tab
3. Try to add a new exchange — only one field is required — the exchange name. I typed x.some-exchange
,clicked Add exchange
, and voilà ! Exchange created !
Although only the name was required, there were other options to modify. What do they mean?
Exchange configuration options
Name
Name of the exchange.
Type
Type of the exchange. You can choose from the following types:
- Direct — delivers messages to queues based on a message routing key. Binding key must exactly match routing key
- Fanout — routes the message to all of the queues bound to the exchange. Routing key is ignored
- Headers — uses the messages headers for routing
- Topic — performs a wildcard match between the routing key and the routing pattern, specified in the binding
If You would like to read more about the exchange types, I wrote an article about them lately — https://medium.com/@aleksanderkolata/rabbitmq-spring-boot-02-exchanges-479516e34142 . You can check it out :)
Durability
- Durable — exchange survives restarts, and last until it will be explicitly deleted
- Transient — exists until RabbitMQ is shut down
Auto delete
Auto-deleted exchanges are removed when the last bound object will be unbounded.
Internal
Clients cannot publish directly to internal exchange. It can be used only with exchange to exchange binding.
Arguments
- alternate-exchange=<exchange-name> — if a message send to this exchange cannot be routed, it will be send to provided alternate exchange (You will see an example later in the demo)
- … You can also provide Your own arguments, which value can be a String, Number, Boolean or a List
Spring Boot Exchange configuration
Abstract Exchange
Class org.springframework.amqp.core.AbstractExchange
in Spring AMQP has 5 implementations:
- CustomExchange
- DirectExchange
- FanoutExchange
- TopicExchange
- HeadersExchange
These classes have 3 public constructors available:
public AbstractExchange(String name) {
this(name, true, false);
}
public AbstractExchange(String name, boolean durable, boolean autoDelete) {
this(name, durable, autoDelete, null);
}
public AbstractExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments) {
super(arguments);
this.name = name;
this.durable = durable;
this.autoDelete = autoDelete;
}
This means, that You can set following exchange properties while creating a new …Exchange
object in Spring configuration:
- name — via constructor
- type — by using a selected
AbstractExchange
implementation - durability — via constructor
- auto delete —via constructor
- arguments — via constructor
If You want to configure additional options (e.g. mark an exchange as internal), You can do it by calling set...(...)
on already created object.
You can also add additional arguments:
exchange.addArgument("key", "value");
Example configuration
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Map;
@Configuration
public class RabbitMqConfig {
public static final String ALTERNATE_EXCHANGE_NAME = "x.alternate";
public static final String ALTERNATE_EXCHANGE_QUEUE = "q.alternate";
private static final boolean DURABLE = true;
private static final boolean TRANSIENT = false;
private static final boolean AUTO_DELETED = true;
private static final boolean MANUALLY_DELETED = false;
private static final Map<String, Object> EMPTY_ARGUMENTS = Map.of();
@Bean
public Declarables declareExchanges() {
return new Declarables(
new DirectExchange("x.direct", DURABLE, AUTO_DELETED, Map.of("alternate-exchange", ALTERNATE_EXCHANGE_NAME)),
new FanoutExchange("x.fanout", DURABLE, AUTO_DELETED, EMPTY_ARGUMENTS),
new TopicExchange("x.topic", TRANSIENT, MANUALLY_DELETED),
new HeadersExchange("x.headers", TRANSIENT, MANUALLY_DELETED, EMPTY_ARGUMENTS)
);
}
@Bean
public Declarables alternateExchange() {
FanoutExchange alternateExchange = new FanoutExchange(ALTERNATE_EXCHANGE_NAME, true, false);
Queue alternateQueue = new Queue(ALTERNATE_EXCHANGE_QUEUE);
return new Declarables(
alternateExchange,
alternateQueue,
BindingBuilder.bind(alternateQueue).to(alternateExchange)
);
}
@Bean
public Declarables internalExchange() {
FanoutExchange exchange = new FanoutExchange("x.internal");
exchange.setInternal(true);
exchange.setShouldDeclare(true);
exchange.setIgnoreDeclarationExceptions(true);
exchange.setDelayed(false);
exchange.addArgument("key", "value");
Queue queue = new Queue("q.internal");
return new Declarables(
exchange,
queue,
BindingBuilder.bind(queue).to(exchange)
);
}
}
- Different exchanges are being created. Only the exchange name is a required parameter
- exchange
x.direct
has an additional argumentalternate-exchange
configured. Not matching messages send to this exchange should be routed to the exchangex.alternate
- alternate exchange
x.alternate
was created, and queueq.alternate
is bounded to this exchange - more advanced exchange configuration is presented in
internalExchange()
method
Alternative exchange declaration method
Alternatively You can declare an exchange directly in the @RabbitListener
annotation.
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "q.listener-declaration"),
exchange = @Exchange(
name = "x.listener-declaration",
type = ExchangeTypes.DIRECT,
durable = "false",
autoDelete = "true",
arguments = {},
internal = "false",
ignoreDeclarationExceptions = "true"),
key = "my-key")
)
public void example(Message message) {
log.info("Received [{}]", message);
}
Demo
In the demo we will see, whether the exchanges will be visible in RabbitMQ management interface, and if alternate exchange works.
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import static pl.akolata.config.RabbitMqConfig.ALTERNATE_EXCHANGE_QUEUE;
@Slf4j
@SpringBootApplication
@RequiredArgsConstructor
public class Application implements CommandLineRunner {
private final RabbitTemplate rabbitTemplate;
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
@RabbitListener(queues = ALTERNATE_EXCHANGE_QUEUE)
public void listen(Message message) {
log.info("Queue [{}] received the message [{}]", ALTERNATE_EXCHANGE_QUEUE, message);
}@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "q.listener-declaration"),
exchange = @Exchange(
name = "x.listener-declaration",
type = ExchangeTypes.DIRECT,
durable = "false",
autoDelete = "true",
arguments = {},
internal = "false",
ignoreDeclarationExceptions = "true"),
key = "my-key")
)
public void example(Message message) {
log.info("Received [{}]", message);
} @Override
public void run(String... args) {
rabbitTemplate.convertAndSend("x.direct", "key-not-bound", "MSG");
}
}
- application sends a message to the
x.direct
exchange. This exchange doesn’t have any bindings, so the routing keykey-not-bound
won’t be matched. Such message should be routed to the exchangex.alternate
, based on argumentalternate-exchange=x.alternate
passed during exchange creation - application listens on
q.alternate
queue, which is bound to thex.alternate
exchange. It should receive the message send tox.direct
exchange
Result:
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.5.5)2021-10-03 09:22:19.613 INFO 2193 --- [ main] pl.akolata.Application : Starting Application using Java 11.0.4 on MacBook-Pro-Aleksander.local with PID 2193 (/Users/aleksanderkolata/IdeaProjects/spring-boot-rabbitmq/03-spring-boot-rabbitmq-exchange-config/target/classes started by aleksanderkolata in /Users/aleksanderkolata/IdeaProjects/spring-boot-rabbitmq/03-spring-boot-rabbitmq-exchange-config)
2021-10-03 09:22:19.615 INFO 2193 --- [ main] pl.akolata.Application : No active profile set, falling back to default profiles: default
2021-10-03 09:22:20.456 INFO 2193 --- [ main] o.s.a.r.c.CachingConnectionFactory : Attempting to connect to: [localhost:5672]
2021-10-03 09:22:20.481 INFO 2193 --- [ main] o.s.a.r.c.CachingConnectionFactory : Created new connection: rabbitConnectionFactory#57d0fc89:0/SimpleConnection@350a94ce [delegate=amqp://guest@127.0.0.1:5672/, localPort= 51104]
2021-10-03 09:22:20.483 INFO 2193 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable or auto-delete Exchange (x.listener-declaration) durable:false, auto-delete:true. It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.
2021-10-03 09:22:20.484 INFO 2193 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable or auto-delete Exchange (x.direct) durable:true, auto-delete:true. It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.
2021-10-03 09:22:20.484 INFO 2193 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable or auto-delete Exchange (x.fanout) durable:true, auto-delete:true. It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.
2021-10-03 09:22:20.484 INFO 2193 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable or auto-delete Exchange (x.topic) durable:false, auto-delete:false. It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.
2021-10-03 09:22:20.484 INFO 2193 --- [ main] o.s.amqp.rabbit.core.RabbitAdmin : Auto-declaring a non-durable or auto-delete Exchange (x.headers) durable:false, auto-delete:false. It will be deleted by the broker if it shuts down, and can be redeclared by closing and reopening the connection.
2021-10-03 09:22:20.572 INFO 2193 --- [ main] pl.akolata.Application : Started Application in 1.305 seconds (JVM running for 2.124)
2021-10-03 09:22:20.590 INFO 2193 --- [ntContainer#1-1] pl.akolata.Application : Queue [q.alternate] received the message [(Body:'MSG' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=x.direct, receivedRoutingKey=key-not-bound, deliveryTag=1, consumerTag=amq.ctag-HMcTyDCwc7AGXWCUrBR9HA, consumerQueue=q.alternate])]
- at the application startup we can see logs telling us, that exchanges
x.direct
,x.fanout
,x.topic
andx.headers
were declared - the message send to the
x.direct
exchange with a not matching routing key was indeed routed tox.alternate
exchange !
Exchanges are also visible in the management interface:
Summary
In this article I covered:
- how to create an exchange in the RabbitMQ Management interface
- what do exchange configuration options mean
- how to configure RabbitMQ exchange using Spring configuration
- how
alternate-exchange
works