RabbitMQ Spring Boot #02 — Exchanges

Aleksander Kołata
14 min readSep 27, 2021

--

Hello! In this article I’m going to explain:

  • basic concepts of AMQP — what is AMQP and how AMQP model looks like
  • RabbitMQ exchange types, and when to use which one of them
  • Spring Boot RabbitMQ configuration

First of all, before we will start talking about RabbitMQ queues, exchanges and other configurations, we should understand the origin of exchange/queue terms, and what RabbitMQ actually is.

AMQP

Definition

AMQP stands for Advanced Message Queuing Protocol. It is an open standard protocol that allows messaging between systems. Different servers/systems can communicate with each other, regardless of technology. AMQP enables message passing through broker services over TCP/IP connections. It defines both the network layer protocol and high-level architecture for message brokers.

AMQP model

Basic AMQP model is shown in the picture bellow:

https://www.cloudamqp.com/img/blog/amq-model.png

Typical message flow:

  1. Producer publishes a message to an exchange. Message contains a routing key
  2. Exchange receives the message and is responsible for the routing. It will copy the message and send it to queue(s) based on the exchange type, message’s routing key and message’s headers
  3. Bindingbinding links exchanges with the queues. It contains a set of rules that exchange uses to route the message to the queues
  4. Message stays in the queue until it’s consumed by the consumer. Queue acts like as a buffer for messages, which will be consumed later
  5. Consumer handles the message

Important thing to remember is the fact, that messages are not published directly to the queues, but they are always being sent to the exchange.

If You would like to learn more about AMQP model, see Materials section at the bottom of this article — there You will find links to more articles, which cover AMQP topic with more details.

RabbitMQ and AMQP

RabbitMQ is a lightweight, scalable message broker, which uses AMQP protocol as a core protocol — supported AMQP version is 0.9.1 natively, and version 1.0.0 via a plugin. If You want, You can use RabbitMQ with other protocols, e.g. STOMP.

Exchanges

Default exchange

Default exchange is a pre-declared direct exchange, which name is set to an empty string "". When a message is send to the default exchange, then it will be routed to the queue with a queue name equal to the message routing key. Every created queue is automatically bound to the default exchange, with the same routing key as queue name.

Direct exchange

Direct exchange uses message’s routing key, to route the message to a single queue. A routing key is set by the producer, as a message header. A routing key in the message must exactly match the routing key specified in the binding — it’s like an address, telling the exchange where a message should go.

Fanout exchange

Fanout exchange copies and sends the received message to all queues bound to it. Provided routing key will be simply ignored.

Headers exchange

Headers exchange routes the message based on… it’s headers ! Routing key is ignored, and if the message headers match binding configuration — the message will be sent to the bounded queue. Headers order does not matter.

An argument x-match is added to the binding between the exchange and the queue, and it can have two values:

  • all — all headers must match in order to route the message to the queue. This is the default value.
  • any — at least one header must match, to route the message to the queue.

It’s worth to mention that in this case, header value may not be a string, so it’s a little more flexible.

Topic exchange

Topic exchange routes the message based on the the routing-key wildcard matching. Messages are being passed to one or many queues. Although routing-key with wildcards is supported, it is not mandatory — You still can provide the routing key implicitly, without any wildcards.

Routing key is a list of words separated with . character, e.g. taxi.eco.small and taxi.eco.large .

You can use * symbol to match any word at a specific position, like taxi.*.large. Symbol # means matching of zero or more words, like taxi.eco.# , which will match all routing keys starting with taxi.eco .

Exchanges use cases

Default exchange

Personally I don’t use it directly at all , and I always provide exchange name in a message producer. Because default exchange is a direct exchange, let’s go to the next point.

Direct exchange

Direct exchange is useful when You want to have a single exchange, and to specify the target “address” of the message, based on a routing key filtering.

One example can be a reporting system, which generates reports in a specified format, asynchronously. A user clicks on a Generate report button and selects the target format- PDF or DOCX file. System schedules report generation by sending a message to x.generate-report exchange, and specifies the target destination as a routing key — pdf or docx . In such case, a producer provides an address for the message where should it be routed.

Fanout exchange

Fanout exchange is useful when You want to send the message to one or more queues, and it’s ideal for broadcasting. It it similar to publish/subscribe model — as a consumer, You subscribe for something, and publisher sends the message to all subscribers. Message can be processed by different consumers in a different way. Routing key will be simply ignored.

Example: Your system publishes a message to x.order-created fanout exchange. It is going to copy and send the message for all bounded subscribers — one subscriber might be a service which is going to generate the invoice to the customer. Second subscriber might be a service, which will start a new process in the magazine, a process of collecting all the order items and assembling them in a single shipment. These subscribers might be queues q.order.generate-invoice and q.order.assemble-shipment , which will both receive a copy of the original message.

Topic exchange

Topic exchange combines multiple message consumers with the filtering, based on the message routing key and wildcards matching. It’s useful, when You want to send a message to potentially more than one bounded queue, but not necessarily all of them.

Let’s say that You produce a message with an information about the newly hired employee, and want to:

  • notify the company’s software systems for HR and Payroll departments — they should register employee data in a core systems
  • notify a single office where the employee is going to work — they should prepare a computer, a badge and some office space
  • do not notify other offices

In such case, You can route the message to queues:

  • q.employees.newly-hired based on employee.newly-hired.office.warsaw routing key and employee.newly-hired.# matching
  • q.employees.offices.warsaw based on employee.newly-hired.office.warsaw routing key and employee.*.office.warsaw matching

Headers exchange

Headers exchange might be useful for routing based on a set of arguments, which are easier to configure as a message headers than as a routing key, because header does not have to be a string. In this attribute, the routing key is going to be ignored.

It might act as a direct exchange (but with more conditions that routing-key equality).

Spring Boot Java configuration and examples

I decided to write examples based on YouTube CloudAMQP RabbitMQ Explained — Exchanges tutorial.

Messaging architecture will represent a taxi corporation, which will receive messages representing taxi orders, and will route them to a queues representing requested taxi types.

Common configuration

Queues:

package pl.akolata.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class QueuesConfiguration {
public static final String QUEUE_TAXI_DEFAULT = "q.taxi.default";
public static final String QUEUE_TAXI_NORMAL_SMALL = "q.taxi.normal.small";
public static final String QUEUE_TAXI_ECO_SMALL = "q.taxi.eco.small";
public static final String QUEUE_TAXI_NORMAL_LARGE = "q.taxi.normal.large";
public static final String QUEUE_TAXI_ECO_LARGE = "q.taxi.eco.large";

@Bean
public Queue queueTaxiDefault() {
return new Queue(QUEUE_TAXI_DEFAULT);
}

@Bean
public Queue queueTaxiNormalSmall() {
return new Queue(QUEUE_TAXI_NORMAL_SMALL);
}

@Bean
public Queue queueTaxiEcoSmall() {
return new Queue(QUEUE_TAXI_ECO_SMALL);
}

@Bean
public Queue queueTaxiNormalLarge() {
return new Queue(QUEUE_TAXI_NORMAL_LARGE);
}

@Bean
public Queue queueTaxiEcoLarge() {
return new Queue(QUEUE_TAXI_ECO_LARGE);
}

}
  • q.taxi.default → this queue will be used with default exchange example
  • q.taxi.normal.small → this queue will be used for messages to taxi drivers, for cars with engine type normal and car size small
  • q.taxi.eco.small → this queue will be used for messages to taxi drivers, for cars with engine type eco and car size small
  • q.taxi.normal.large → this queue will be used for messages to taxi drivers, for cars with engine type normal and car size large
  • q.taxi.eco.large → this queue will be used for messages to taxi drivers, for cars with engine type eco and car size large

Model:

package pl.akolata.model;

import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.util.UUID;

@Data
@NoArgsConstructor
public class OrderTaxiMessage implements Serializable {
private String id = UUID.randomUUID().toString();
}
  • id → it will be used to see, if a message routed to the queue is the same as a message send to the exchange

Messages logging:

package pl.akolata.util;

import lombok.experimental.UtilityClass;
import lombok.extern.slf4j.Slf4j;
import pl.akolata.model.OrderTaxiMessage;

import java.util.Map;

@Slf4j
@UtilityClass
public class MessagingLoggingUtil {
public static void logReceivedMessage(String queue, OrderTaxiMessage message) {
log.info("Listener on queue [{}] received message [{}]", queue, message);
}

public static void logSendMessage(String exchange, String routingKey, OrderTaxiMessage message) {
log.info("Message [{}] send to exchange [{}] with routing key [{}]", message.getId(), exchange, routingKey);
}

public static void logSendMessage(String exchange, String routingKey, OrderTaxiMessage message, Map<String, Object> headers) {
log.info("Message [{}] send to exchange [{}] with routing key [{}] and headers [{}]", message.getId(), exchange, routingKey, headers);
}
}
  • utility methods used for logging

Queues listeners

package pl.akolata.demo;

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import pl.akolata.model.OrderTaxiMessage;

import static pl.akolata.config.QueuesConfiguration.*;
import static pl.akolata.util.MessagingLoggingUtil.logReceivedMessage;

@Slf4j
@Service
public class TaxiQueuesListener {

@RabbitListener(queues = {QUEUE_TAXI_DEFAULT})
public void listenOnQueueDefault(OrderTaxiMessage message) {
logReceivedMessage(QUEUE_TAXI_DEFAULT, message);
}

@RabbitListener(queues = {QUEUE_TAXI_NORMAL_SMALL})
public void listenOnQueueTaxiNormalSmall(OrderTaxiMessage message) {
logReceivedMessage(QUEUE_TAXI_NORMAL_SMALL, message);
}

@RabbitListener(queues = {QUEUE_TAXI_ECO_SMALL})
public void listenOnQueueTaxiEcoSmall(OrderTaxiMessage message) {
logReceivedMessage(QUEUE_TAXI_ECO_SMALL, message);
}

@RabbitListener(queues = {QUEUE_TAXI_ECO_LARGE})
public void listenOnQueueTaxiEcoLarge(OrderTaxiMessage message) {
logReceivedMessage(QUEUE_TAXI_ECO_LARGE, message);
}

@RabbitListener(queues = {QUEUE_TAXI_NORMAL_LARGE})
public void listenOnQueueTaxiNormalLarge(OrderTaxiMessage message) {
logReceivedMessage(QUEUE_TAXI_NORMAL_LARGE, message);
}

}
  • I declared a dedicated listener for every queue

Default exchange

Configuration

There is no need for additional configuration to present how the default exchange works — the only required bean is a queue q.taxi.default, declared before.

...
@Bean
public Queue queueTaxiDefault() {
return new Queue(QUEUE_TAXI_DEFAULT);
}
...

Producer

package pl.akolata.demo;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import pl.akolata.model.OrderTaxiMessage;
import pl.akolata.util.MessagingLoggingUtil;

import static pl.akolata.config.QueuesConfiguration.QUEUE_TAXI_DEFAULT;


@Slf4j
@Service
@RequiredArgsConstructor
public class DefaultExchangeProducer {
private final RabbitTemplate rabbitTemplate;

public void sendMessage() {
OrderTaxiMessage msg = new OrderTaxiMessage();
rabbitTemplate.convertAndSend(QUEUE_TAXI_DEFAULT, msg);
MessagingLoggingUtil.logSendMessage("", QUEUE_TAXI_DEFAULT, msg);
}

}
  • I provided queue q.taxi.default as a routing-key argument to convertAndSend method, without providing an exchange name

Result

After running the application, You should see similar logs

Message [095a6b2c-f1e4-4716-997c-fad2ab8781e7] send to exchange [] with routing key [q.taxi.default]
Listener on queue [q.taxi.default] received message [OrderTaxiMessage(id=095a6b2c-f1e4-4716-997c-fad2ab8781e7)]
  • Message with id 095a6b2c-f1e4–4716–997c-fad2ab8781e7 was send to the default exchange using q.taxi.default routing key
  • Listener on queue q.taxi.default received the message with the same id — 095a6b2c-f1e4–4716–997c-fad2ab8781e7

Direct exchange

Configuration

package pl.akolata.config;

import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Declarables;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqDirectExchangeConfig {
public static final String EXCHANGE_TAXI_DIRECT = "x.taxi.direct";
public static final String ROUTING_KEY_TAXI_NORMAL_SMALL = "taxi.normal.small";
public static final String ROUTING_KEY_TAXI_ECO_SMALL = "taxi.eco.small";
public static final String ROUTING_KEY_TAXI_NORMAL_LARGE = "taxi.normal.large";
public static final String ROUTING_KEY_TAXI_ECO_LARGE = "taxi.eco.large";

@Bean
public DirectExchange exchangeTaxiDirect() {
return new DirectExchange(EXCHANGE_TAXI_DIRECT);
}

@Bean
public Declarables directExchangeBindings(
DirectExchange exchangeTaxiDirect,
Queue queueTaxiNormalSmall,
Queue queueTaxiEcoSmall,
Queue queueTaxiNormalLarge,
Queue queueTaxiEcoLarge) {
return new Declarables(
BindingBuilder.bind(queueTaxiNormalSmall).to(exchangeTaxiDirect).with(ROUTING_KEY_TAXI_NORMAL_SMALL),
BindingBuilder.bind(queueTaxiEcoSmall).to(exchangeTaxiDirect).with(ROUTING_KEY_TAXI_ECO_SMALL),
BindingBuilder.bind(queueTaxiNormalLarge).to(exchangeTaxiDirect).with(ROUTING_KEY_TAXI_NORMAL_LARGE),
BindingBuilder.bind(queueTaxiEcoLarge).to(exchangeTaxiDirect).with(ROUTING_KEY_TAXI_ECO_LARGE)
);
}

}
  • I created a direct exchange x.taxi.direct
  • I created a binding between the queue q.taxi.normal.small and the exchange x.taxi.direct using routing key taxi.normal.small
  • I created a binding between the queue q.taxi.eco.small and the exchange x.taxi.direct using routing key taxi.eco.small
  • I created a binding between the queue q.taxi.normal.large and the exchange x.taxi.direct using routing key taxi.normal.large
  • I created a binding between the queue q.taxi.eco.large and the exchange x.taxi.direct using routing key taxi.eco.large

Producer

package pl.akolata.demo;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import pl.akolata.model.OrderTaxiMessage;

import static pl.akolata.config.RabbitMqDirectExchangeConfig.*;
import static pl.akolata.util.MessagingLoggingUtil.logSendMessage;


@Slf4j
@Service
@RequiredArgsConstructor
public class DirectExchangeProducer {
private final RabbitTemplate rabbitTemplate;

public void sendMessage() {
sendMsg(ROUTING_KEY_TAXI_NORMAL_SMALL);
sendMsg(ROUTING_KEY_TAXI_ECO_LARGE);
sendMsg("not-matching");
}

private void sendMsg(String routingKey) {
OrderTaxiMessage message = new OrderTaxiMessage();
rabbitTemplate.convertAndSend(EXCHANGE_TAXI_DIRECT, routingKey, message);
logSendMessage(EXCHANGE_TAXI_DIRECT, routingKey, message);
}

}
  • 3 different messages have been sent, to the same exchange x.taxi.direct but with different routing keys

Result

Message [2b4ab162-3136-4697-8a43-db46ffde23a4] send to exchange [x.taxi.direct] with routing key [taxi.normal.small]
Message [a5cc9fa5-4977-468c-8167-77db3ada399c] send to exchange [x.taxi.direct] with routing key [taxi.eco.large]
Message [af6539ee-f9a7-4d3c-a2b7-08da4af1ff06] send to exchange [x.taxi.direct] with routing key [not-matching]
Listener on queue [q.taxi.normal.small] received message [OrderTaxiMessage(id=2b4ab162-3136-4697-8a43-db46ffde23a4)]
Listener on queue [q.taxi.eco.large] received message [OrderTaxiMessage(id=a5cc9fa5-4977-468c-8167-77db3ada399c)]
  • message with id 2b4ab162–3136–4697–8a43-db46ffde23a4 was send to the exchange x.taxi.direct with a routing key taxi.normal.small, and was received by the listener on queue q.taxi.normal.small
  • message with id a5cc9fa5–4977–468c-8167–77db3ada399c was send to the exchange x.taxi.direct with a routing key taxi.eco.large, and was received by the listener on queue q.taxi.eco.large
  • message with id af6539ee-f9a7–4d3c-a2b7–08da4af1ff06 was send to the exchange x.taxi.direct with a routing key not-matching, and… it hasn’t been passed to any queue

Fanout Exchange

Configuration

package pl.akolata.config;

import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Declarables;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqFanoutExchangeConfig {
public static final String EXCHANGE_TAXI_FANOUT = "x.taxi.fanout";

@Bean
public FanoutExchange exchangeTaxiFanout() {
return new FanoutExchange(EXCHANGE_TAXI_FANOUT);
}

@Bean
public Declarables fanoutExchangeBindings(
FanoutExchange exchangeTaxiFanout,
Queue queueTaxiNormalSmall,
Queue queueTaxiEcoSmall,
Queue queueTaxiNormalLarge,
Queue queueTaxiEcoLarge) {
return new Declarables(
BindingBuilder.bind(queueTaxiNormalSmall).to(exchangeTaxiFanout),
BindingBuilder.bind(queueTaxiEcoSmall).to(exchangeTaxiFanout),
BindingBuilder.bind(queueTaxiNormalLarge).to(exchangeTaxiFanout),
BindingBuilder.bind(queueTaxiEcoLarge).to(exchangeTaxiFanout)
);
}

}
  • I created a fanout exchange x.taxi.fanout
  • I created a binding between the queue q.taxi.normal.small and the exchange x.taxi.fanout without providing a routing key
  • I created a binding between the queue q.taxi.eco.small and the exchange x.taxi.fanout without providing a routing key
  • I created a binding between the queue q.taxi.normal.large and the exchange x.taxi.fanout without providing a routing key
  • I created a binding between the queue q.taxi.eco.large and the exchange x.taxi.fanout without providing a routing key

Producer

package pl.akolata.demo;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import pl.akolata.model.OrderTaxiMessage;

import static pl.akolata.config.RabbitMqFanoutExchangeConfig.EXCHANGE_TAXI_FANOUT;
import static pl.akolata.util.MessagingLoggingUtil.logSendMessage;


@Slf4j
@Service
@RequiredArgsConstructor
public class FanoutExchangeProducer {
private final RabbitTemplate rabbitTemplate;

public void sendMessage() {
OrderTaxiMessage message = new OrderTaxiMessage();
rabbitTemplate.convertAndSend(EXCHANGE_TAXI_FANOUT, "", message);
logSendMessage(EXCHANGE_TAXI_FANOUT, "", message);
}

}
  • 1 message have been sent to the q.taxi.fanout exchange, without providing a routing key (I passed "" as a routing key)

Result

Message [5354f33c-72cd-4c58-9515-47326497b54c] send to exchange [x.taxi.fanout] with routing key []
Listener on queue [q.taxi.eco.large] received message [OrderTaxiMessage(id=5354f33c-72cd-4c58-9515-47326497b54c)]
Listener on queue [q.taxi.normal.large] received message [OrderTaxiMessage(id=5354f33c-72cd-4c58-9515-47326497b54c)]
Listener on queue [q.taxi.eco.small] received message [OrderTaxiMessage(id=5354f33c-72cd-4c58-9515-47326497b54c)]
Listener on queue [q.taxi.normal.small] received message [OrderTaxiMessage(id=5354f33c-72cd-4c58-9515-47326497b54c)]
  • message with id 5354f33c-72cd-4c58–9515–47326497b54c was send to the exchange x.taxi.fanout
  • message with the same id has been received by all listeners bounded to x.taxi.fanout exchange

Topic Exchange

Configuration

package pl.akolata.config;

import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Declarables;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import static pl.akolata.config.RabbitMqDirectExchangeConfig.*;

@Configuration
public class RabbitMqTopicExchangeConfig {
public static final String EXCHANGE_TAXI_TOPIC = "x.taxi.topic";

@Bean
public TopicExchange exchangeTaxiTopic() {
return new TopicExchange(EXCHANGE_TAXI_TOPIC);
}

@Bean
public Declarables topicExchangeBindings(
TopicExchange exchangeTaxiTopic,
Queue queueTaxiNormalSmall,
Queue queueTaxiEcoSmall,
Queue queueTaxiNormalLarge,
Queue queueTaxiEcoLarge) {
return new Declarables(
BindingBuilder.bind(queueTaxiNormalSmall).to(exchangeTaxiTopic).with(ROUTING_KEY_TAXI_NORMAL_SMALL),
BindingBuilder.bind(queueTaxiEcoSmall).to(exchangeTaxiTopic).with(ROUTING_KEY_TAXI_ECO_SMALL),
BindingBuilder.bind(queueTaxiNormalLarge).to(exchangeTaxiTopic).with(ROUTING_KEY_TAXI_NORMAL_LARGE),
BindingBuilder.bind(queueTaxiEcoLarge).to(exchangeTaxiTopic).with(ROUTING_KEY_TAXI_ECO_LARGE),
BindingBuilder.bind(queueTaxiNormalLarge).to(exchangeTaxiTopic).with("*.*.large"),
BindingBuilder.bind(queueTaxiEcoLarge).to(exchangeTaxiTopic).with("*.*.large"),
BindingBuilder.bind(queueTaxiEcoSmall).to(exchangeTaxiTopic).with("taxi.eco.#"),
BindingBuilder.bind(queueTaxiEcoLarge).to(exchangeTaxiTopic).with("taxi.eco.#")
);
}

}
  • I created a topic exchange x.taxi.topic
  • I declared exact matching between all 4 taxi queues and previously used routing key — matching between queue q.taxi.normal.small and routing key taxi.normal.small etc.
  • I declared a wildcard matching taxi.*.large which will match any large taxi routing keys — the world in the middle does not matter
  • I declared wildcard matching taxi.eco.# which will match all routing keys starting with taxi.eco

Producer

package pl.akolata.demo;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import pl.akolata.model.OrderTaxiMessage;

import static pl.akolata.config.RabbitMqTopicExchangeConfig.EXCHANGE_TAXI_TOPIC;
import static pl.akolata.util.MessagingLoggingUtil.logSendMessage;


@Slf4j
@Service
@RequiredArgsConstructor
public class TopicExchangeProducer {
private final RabbitTemplate rabbitTemplate;

public void sendMessage() {
sendMsg("taxi.normal.small");
sendMsg("whatever.any.large");
sendMsg("taxi.eco.additional.words");
}

private void sendMsg(String routingKey) {
OrderTaxiMessage message = new OrderTaxiMessage();
rabbitTemplate.convertAndSend(EXCHANGE_TAXI_TOPIC, routingKey, message);
logSendMessage(EXCHANGE_TAXI_TOPIC, routingKey, message);
}

}

Result

Message [00c23257-33b7-462d-844a-24cbd16e83b9] send to exchange [x.taxi.topic] with routing key [taxi.normal.small]
Message [0959fce7-49c6-4231-a4dd-2f4a32a3af3e] send to exchange [x.taxi.topic] with routing key [whatever.any.large]
Message [9d13fdf6-beba-4075-b222-36eeaade947f] send to exchange [x.taxi.topic] with routing key [taxi.eco.additional.words]
Listener on queue [q.taxi.normal.large] received message [OrderTaxiMessage(id=0959fce7-49c6-4231-a4dd-2f4a32a3af3e)]
Listener on queue [q.taxi.normal.small] received message [OrderTaxiMessage(id=00c23257-33b7-462d-844a-24cbd16e83b9)]
Listener on queue [q.taxi.eco.small] received message [OrderTaxiMessage(id=9d13fdf6-beba-4075-b222-36eeaade947f)]
Listener on queue [q.taxi.eco.large] received message [OrderTaxiMessage(id=0959fce7-49c6-4231-a4dd-2f4a32a3af3e)]
Listener on queue [q.taxi.eco.large] received message [OrderTaxiMessage(id=9d13fdf6-beba-4075-b222-36eeaade947f)]
  • Message 00c23257–33b7–462d-844a-24cbd16e83b9 was sent withtaxi.normal.small routing key, and was routed to only one queue — q.taxi.normal.small , because of previously defined exact routing key matching (first binding in topicExchangeBindings method)
  • Message 0959fce7–49c6–4231-a4dd-2f4a32a3af3e was sent with whatever.any.large routing key, and the message was routed to queues q.taxi.normal.large and q.taxi.eco.large, based on *.*.large wildcard binding — the first two words in the routing key could be anything
  • Message 9d13fdf6-beba-4075-b222–36eeaade947fwas sent with taxi.eco.additional.words routing key, and the message was routed to queues q.taxi.eco.small and q.taxi.eco.large, based on taxi.eco.# wildcard binding, because routing key started with taxi.eco

Headers Exchange

Configuration

package pl.akolata.config;

import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Declarables;
import org.springframework.amqp.core.HeadersExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.Map;

@Configuration
public class RabbitMqHeadersExchangeConfig {
public static final String EXCHANGE_TAXI_HEADERS = "x.taxi.headers";

@Bean
public HeadersExchange exchangeTaxiHeaders() {
return new HeadersExchange(EXCHANGE_TAXI_HEADERS);
}

@Bean
public Declarables headersExchangeBindings(
HeadersExchange exchangeTaxiHeaders,
Queue queueTaxiNormalSmall,
Queue queueTaxiEcoLarge) {
return new Declarables(
BindingBuilder.bind(queueTaxiNormalSmall).to(exchangeTaxiHeaders).whereAny(Map.of("from", "eco.small", "pricingModel", 1)).match(),
BindingBuilder.bind(queueTaxiEcoLarge).to(exchangeTaxiHeaders).whereAll(Map.of("from", "eco.large", "pricingModel", 2)).match()
);
}

}
  • I created a header exchange x.taxi.headers
  • I created a binding between the queue q.taxi.normal.small and the exchange x.taxi.headers based on matching any header from=eco.small and pricingModel=1
  • I created a binding between the queue q.taxi.eco.large and the exchange x.taxi.headers based on matching all headers from=eco.large and pricingModel=2

Producer

package pl.akolata.demo;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.stereotype.Service;
import pl.akolata.model.OrderTaxiMessage;

import static pl.akolata.config.RabbitMqHeadersExchangeConfig.EXCHANGE_TAXI_HEADERS;
import static pl.akolata.util.MessagingLoggingUtil.logSendMessage;


@Slf4j
@Service
@RequiredArgsConstructor
public class HeadersExchangeProducer {
private final RabbitTemplate rabbitTemplate;

public void sendMessage() {
sendMsg("eco.small", 1);
sendMsg("eco.large", 1);
sendMsg("eco.large", 2);
sendMsg("eco.large", 3);
}

private void sendMsg(Object headerFrom, Object headerPricingModel) {
OrderTaxiMessage orderTaxiMessage = new OrderTaxiMessage();

MessageProperties messageProperties = new MessageProperties();
messageProperties.setHeader("from", headerFrom);
messageProperties.setHeader("pricingModel", headerPricingModel);

MessageConverter messageConverter = new SimpleMessageConverter();
Message message = messageConverter.toMessage(orderTaxiMessage, messageProperties);

rabbitTemplate.convertAndSend(EXCHANGE_TAXI_HEADERS, "", message);
logSendMessage(EXCHANGE_TAXI_HEADERS, "", orderTaxiMessage, messageProperties.getHeaders());
}

}
  • headers are being set in MessageProperties object, and the actual message is being converted to org.springframework.amqp.core.Message object
  • empty string "" is being passed as a routing key

Result

Message [f3606eaa-1c41-4684-b777-0af5050c3c5f] send to exchange [x.taxi.headers] with routing key [] and headers [{pricingModel=1, from=eco.small}]
Message [898a5a35-c02e-4f58-b6d5-982ef4db1ec6] send to exchange [x.taxi.headers] with routing key [] and headers [{pricingModel=1, from=eco.large}]
Message [b44239ba-339d-4c8a-9b6f-d2bfb99ffeaa] send to exchange [x.taxi.headers] with routing key [] and headers [{pricingModel=2, from=eco.large}]
Message [3d09aa0e-947e-4c85-bbf1-aca1ae8c3208] send to exchange [x.taxi.headers] with routing key [] and headers [{pricingModel=3, from=eco.large}]
Listener on queue [q.taxi.eco.large] received message [OrderTaxiMessage(id=b44239ba-339d-4c8a-9b6f-d2bfb99ffeaa)]
Listener on queue [q.taxi.normal.small] received message [OrderTaxiMessage(id=f3606eaa-1c41-4684-b777-0af5050c3c5f)]
Listener on queue [q.taxi.normal.small] received message [OrderTaxiMessage(id=898a5a35-c02e-4f58-b6d5-982ef4db1ec6)]
  • message f3606eaa-1c41–4684-b777–0af5050c3c5f was sent with headers from=eco.small, pricingModel=1, and was consumed from the queue q.taxi.normal.small based on binding any from=eco.small pricingModel=1
  • message 898a5a35-c02e-4f58-b6d5–982ef4db1ec6 was sent with headers from=eco.large, pricingModel=1, and was consumed from the queue q.taxi.normal.small based on binding any from=eco.small pricingModel=1
  • message b44239ba-339d-4c8a-9b6f-d2bfb99ffeaa was sent with headers from=eco.large, pricingModel=2, and was consumed from the queue q.taxi.eco.large based on binding all from=eco.small pricingModel=1
  • message 3d09aa0e-947e-4c85-bbf1-aca1ae8c3208 was sent with headers from=eco.large, pricingModel=3, but headers didn’t match any binding

Summary

After reading this article, You should more or less understand

  • what is AMQP and RabbitMQ
  • what AMQP model looks like
  • what are the available RabbitMQ exchange types, and when to use which
  • how to setup RabbitMQ exchanges/queues model using Spring Boot

Materials:

--

--

Aleksander Kołata

Senior Full Stack Developer — Java (Spring) and TypeScript (Angular). DDD and software architecture enthusiast.