How to implement Reactor RabbitMQ in your reactive Spring Boot application — Part 1

Practical examples using the functional Java API Reactor RabbitMQ

Jay Morelli
10 min readMay 5, 2022
RabbitMQ Message Broker Diagram

It is not easy to learn a new technology, and even less so through documentation. It requires will and patience. Often, we as developers, prefer a more engaging source of learning, like a YouTube video, a Medium article, or a book. But what happens when the only source of knowledge is in the documentation? The task becomes much harder.

Recently, I decided to work on a new personal project to learn more about the reactive-stack web framework, Spring WebFlux. It is a new functional web framework, built using reactive principles. When I got to the part of implementing RabbitMQ as the message broker for my application, I could not find any resources (besides the documentation).

I decided to share 2 practical examples. I will go through these examples slowly, step by step. In this first part, you will learn how to implement Reactor RabbitMQ in the most basic form, which is the example that was shared in the Reactor RabbitMQ repo. In Parts 2, I will be bumping up the complexity, so you can finally see a real-world example in action.

In the next following sections, I will explain briefly the main concepts of Spring WebFlux and RabbitMQ. If you have no prior knowledge of either one, I strongly suggest reading more about it. For any Spring content the Baeldung website is for me the best resource on the internet. Check this Spring WebFlux article to learn more about it.

Spring WebFlux

Spring WebFlux is a functional web framework, built using reactive principles.

Baeldung

Reactive programming describes a paradigm that relies on asynchronous programming. It creates software that responds to events.

Java 8 introduced the Streams API and Lambda Expressions, which led us to Spring WebFlux. It is now possible to create functional endpoints and have a fully non-blocking web application.

The term, “reactive,” refers to programming models that are built around reacting to change — network components reacting to I/O events, UI controllers reacting to mouse events, and others. In that sense, non-blocking is reactive, because, instead of being blocked, we are now in the mode of reacting to notifications as operations complete or data becomes available.
Spring documentation

I am not here to explain the Spring WebFlux framework, it is just out of scope. But before we move to RabbitMQ, there are two concepts that you need to understand, Mono and Flux APIs.

Mono

A Reactive Streams Publisher with basic rx operators that emits at most one item via the onNext signal then terminates with an onComplete signal (successful Mono, with or without value), or only emits a single onError signal (failed Mono).
Project Reactor, Mono

It will emit at most one item. It can be a primitive type, an object, a server response, it could be anything. But it will always hold at most one value, never more.

Let’s suppose you are building an application and you have an entity of a User.

When you call your repository to find a user by id, you expect to receive that particular user. But because we are using a fully asynchronous framework, you won’t receive the object immediately.

Instead, the mono publisher will emit its value whenever it is ready.

Flux

A Reactive Streams Publisher with rx operators that emits 0 to N elements, and then completes (successfully or with an error).
Project Reactor, Flux

Similar to a Mono, Flux is also a Reactive Stream but the difference is that emits 0 to N elements.

Using the same scenario as before, let’s use an example to retrieve a list of user from a reactive repository.

Key takeaways

  1. Mono and Flux are both reactive streams. They differ in what they emit.
  2. Mono and Flux are lazy, they will only be executed when consumed. That is why the .subscribe() at the end.
  3. Fully non-blocking reactive streams.

RabbitMQ

RabbitMQ is a message broker: it accepts and forwards messages. You can think about it as a post office: when you put the mail that you want posting in a post box, you can be sure that the letter carrier will eventually deliver the mail to your recipient. In this analogy, RabbitMQ is a post box, a post office, and a letter carrier.

RabbitMQ

There are five important concepts to understand in this section, they are: message broker, producer, consumer, exchange, and queue.

Producer

Sometimes called Publisher, if using the publisher/subscriber method, is the component that sends the message. A program that sends the message is a producer.

Consumer

The consumer, or subscriber, is the program that mostly waits to receive messages.

Message Broker

It is an intermediary computer program that applications and services use to exchange information with each other. It can be used to store, deliver and route messages. It is important to understand that it is an intermediary computer program. This means that it doesn’t know if the services are online or offline and doesn’t know how many recipients there are, but it still ensures that the recipients receive the messages.

Queue

The messages received from the producers will be kept in a message queue, and as soon as the consumers are ready to receive them, they will consume the messages. That is how the message broker ensures that the recipient receives a given message.

Exchange

As mentioned earlier, the producer sends the messages to the message broker. The entry point of that intermediary computer program is the Exchange. It is a message routing agent, which is responsible for routing the messages to different queues. The connection between an exchange and a queue is called binding.

Note that there are different types of exchange, but because of the scope of this article, I won’t be going into detail about each one of those. I will leave this link for you to check out.

Reactive API for RabbitMQ

With the fundamentals of RabbitMQ and the Spring WebFlux covered, the last piece of the puzzle is the Reactive API for RabbitMQ. If you are building a reactive application, you want the non-blocking back-pressure feature but the RabbitMQ does not provide such API. Reactor RabbitMQ is a reactive API for RabbitMQ and it is based on the Reactor project and the RabbitMQ Java Client.

Reactor RabbitMQ API enables messages to be published to RabbitMQ and consumed from RabbitMQ using functional APIs with non-blocking back-pressure and very low overheads.

Reactor RabbitMQ is not intended to replace any of the existing Java libraries. Instead, it is aimed at providing an alternative API for reactive event-driven applications.
Reactor RabbitMQ

Reactor RabbitMQ — Sample 1

Let’s start with the basics and then we will work our way through the more complex and useful case. In this first sample, we will learn how we can send a simple message to the queue and how the consumer can consume those values using the functional API.

Requirements

It is imperative that you understand how to create a Spring Boot Application, I will not cover the requisites here.

The easiest and most simple way to run RabbitMQ is with a Docker container. If you do not have Docker installed, please install it following the official documentation here.

Make sure docker is running in the background with the following command.

docker --version

If everything is working as supposed, the output should give the version of your current docker version.

Following the official documentation of RabbitMQ, to run the latest docker image just use the command:

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management

This means that the RabbitMQ will be listening on port 5672 and the management portal will be on 15672.

RabbitMQ

After the docker image has been up, please go to http://localhost:15672. You will be prompted to log in to the system. The username and password are both ‘guest’

RabbitMQ log in

In this portal, it is possible to view the connections, exchanges, queues and much more. It will be useful to view the messages being sent and consumed by the queue. Now it is time to learn how to connect your Spring Boot application to RabbitMQ.

Spring Boot Application — Sender

All the coded provide here are available in my GitHub repo.

For the first sample, case, I have created two different Spring Boot applications. One for the sender and the other for the receiver.

Using your choice to create the project structure, the dependencies needed on your pom.xml file are the following:

  • spring-boot-starter-webflux
  • amqp-client
  • spring-amqp

Here it is my pom.xml file for the Sender application.

Create a ‘config’ folder and a ‘RabbitConfig.java’ class for the RabbitMQ configurations. The folder structure should be the following:

-> src
| -> main
| -> java/groupid/artifactid
| -> config
| RabbitConfig.java
| (ApplicationName)Application.java

Configuration — Sender

RabbitConfig class should have the @Configuration annotation, so it can process the class and generate Spring Beans to be used in the application. In our case, it requires 3 different beans.

To create the sender instance, it needs to receive a SenderOptions. When creating a new senderOptions, you need to set the connection (a mono) and the resource management scheduler.

RabbitConfig class should contain these 3 beans. The connection, the sender options, and then finally the bean for the Sender.

Note that we are binding everything in pure mathematical functions style (functional programming).

Application — Sender

The most simple way to run and test this first sample is to create a Runner class that implements the CommandLineRunner.

Before we create the Runner class, there are a few properties and method that needs to be declared.

First, we are auto-wiring the connection bean so we can then safely close before the program is finished. Lastly, we have declared here the name of our queue and also a logger that we will use later on.

Let’s now create an inner class, that implements CommandLineRunner.

// Runner class@Componentstatic class Runner implements CommandLineRunner {   ...
}

Here we will create our Sender and the Outbound Message, so we can then declare the queue and send the messages.

Because the whole process of sending each message is asynchronous I have decided to use a synchronization aid, the CountDownLatch. We will initialize it with the count of 10 (the number of messages that will be sent) and then we will decrement the count each time a message is sent. In the end, we will call latch.await(3L, TimeUnit.SECONDS); which causes the current thread to wait until the latch has been counted down to 0.

Let’s use the constructor injection to auto-wire the Sender.

// Runner class@Componentstatic class Runner implements CommandLineRunner {  final Sender sender;  Runner(Sender sender) {    this.sender = sender;  }
...

Inside the run method, we declare the message count so that we can initialize the CountDownLatch.

@Overridepublic void run(String... args) throws Exception {// Number of message that will be sent to the queueint messageCount = 10;// CountDownLatch to keep track of the threadsCountDownLatch latch = new CountDownLatch(messageCount);

We will be sending the following messages to the queue: “Message - i” where i, indicates the number of the current message.

The OutboundMessage contains the default exchange, the name of the queue and the message in byte code that we want to send.

new OutboundMessage( "", QUEUE, ("Message - " + i).getBytes())

We need to create a flux of the outbound message so that the sender can publish it.

Flux<OutboundMessage> outboundFlux  = Flux.range(1, messageCount).map(i -> new OutboundMessage( "", QUEUE, ("Message - " + i).getBytes()));

We need to declare the queue with the Sender and send the flux of outbound messages. Every time the message has been acknowledged by the broker we will log a message and also decrease the latch.

sender.declareQueue(QueueSpecification.queue(QUEUE)).thenMany(sender.sendWithPublishConfirms(outboundFlux)).doOnError(e -> LOGGER.error("Send failed", e)).subscribe(m -> {
if(m.isAck()) {
LOGGER.info("Message [" + latch.getCount() + "] sent"); latch.countDown(); }});

Finally, we will call the await method on the latch to give some time to send the messages before the program is finished.

latch.await(3L, TimeUnit.SECONDS);

After running the Sender Spring Boot Application the output is the following:

Sender output

Going back to the RabbitMQ portal under the queue section, you can notice that a new queue was created with the name of demo-queue. If you go to that queue you can see that there are now 10 queued messages ready to be consumed by the receiver.

Queued messages at localhost:15672

Spring Boot Application — Receiver

To create a new Spring Boot Application, just follow the same step as before. The dependencies are exactly the same.

Configuration —Receiver

The configuration is exactly the same as before but instead of creating a bean for the Sender, you need to create one for the Receiver.

@BeanReceiver receiver(ReceiverOptions receiverOptions) {    return RabbitFlux.createReceiver(receiverOptions);}

Application —Receiver

Just like we did with the Sender, we will create a Runner class.

We will also use the synchronization aid, CountDownLatch.

int messageCount = 10;CountDownLatch latch = new CountDownLatch(messageCount);

The main difference is that instead of sending the message we will be consuming a Flux of messages from the queue.

Disposable disposable = receiver.consumeNoAck("demo-queue").subscribe(m -> {  LOGGER.info("Received message {}", new String(m.getBody()));  latch.countDown();});

After running the Receiver Application, the output on the console will be received messages.

Receiver output

If we take a look at the queued messages in the RabbitMQ portal we can notice that the messages have been successfully consumed.

Queued messages — 2 at localhost:15672

What is next?

As I mentioned earlier, this first sample is very simple but not very useful.

In the next sample, we will take what we learned from this lesson and then we will create a use case where the client will be publishing new orders for the backend to consume from the queue whenever it is ready to do so.

I really hope you learned something valuable from this article. Please feel free to check my GitHub for the complete code and, ask any questions that you may have.

See you soon,

Jay Morelli.

--

--

Jay Morelli

I write about my journey toward becoming a better Software Engineer.