RabbitMQ .Net Demo, Publisher/Subscriber, Fanout Exchange, Microservices

Bob Code
9 min readNov 3, 2023

--

GitHub repo

Introduction

RabbitMQ is a message-queueing software also known as a message broker or queue manager.

Why do we need RabbitMQ?

Monolithic architecture is intricate and rigid, housing all functions within a single system, leading to maintenance challenges. Even minor modifications can disrupt the entire system.

In contrast, microservice architecture separates functions into standalone components, simplifying updates, additions, or removals without impacting the entire structure.

However for each component to communicate we need a broker that manages all messages sent to all components

RabbitMQ Terms

  • Connection

A network connection to the RabbitMQ server.

  • Message

The data unit sent between producers and consumers.

  • Producer

Creates and sends or publishes messages to RabbitMQ exchanges.

  • Exchange

Routes messages from producers to queues based on rules

  • Channel

A virtual communication pathway within a connection

  • Queue

Stores and holds messages until they are consumed

  • Binding

Links exchanges to queues, defining routing rules

  • Consumer

Receives and processes messages from queues

Why do we want pub/ sub?

Publisher/Subscriber allows us to have one publisher and many subscribers. Other models (such a producer/consumer) only have one subscriber

producer/consumer
publisher/subscribers

What are we going to Build?

We are going to build a publish/ subscribe model using a fanout exchange.

What is a publish/ subscribe model?

The Publish-Subscribe model is a messaging pattern where messages are broadcasted from a sender (publisher) to multiple receivers (subscribers).

Basically one message is created by one publisher and then consumed by multiple subscribers.

This is particularly great for microservices architectures where one event updates multiple components in your application.

What is a fanout exchange?

There are different types of exchanges (component that connects the message published to the queue(s)) and the fanout type sends one message to all queues that are bound to it.

Which is the perfect use case for pub/sub models

Read more about all exchanges via this link:

https://www.rabbitmq.com/tutorials/amqp-concepts.html#:~:text=A%20fanout%20exchange%20routes%20messages,the%20broadcast%20routing%20of%20messages.

Coding Plan

Here are the four projects in our solution

  • Controller that receives Request
  • Application that handles the request
  • Domain that manages the RabbitMQ connection
  • Subscriber that consumes the Rabbit MQ message

1# Controller

Create an API project, then modify the controller

[ApiController]
[Route("[controller]")]
public class BankingController : ControllerBase
{

private readonly ILogger<BankingController> _logger;
private readonly IMediator _mediator;

public BankingController(ILogger<BankingController> logger, IMediator mediator)
{
_logger = logger;
_mediator = mediator;
}

[HttpPost(Name = "TransferMoney")]
public async Task Post([FromBody] MoneyTransferCommandDto moneyTransfer)
{
_logger.LogInformation($"Request received from {moneyTransfer.SenderName}");

await _mediator.Send(moneyTransfer);
}
}

Test the controller with this request

{
"transferAmount": 100000,
"recipientName": "Frank",
"senderName": "Bob"
}

We are using MediatR, AutoMapper and our RabbitMQ interface so add the following to the Program.cs

builder.Services.AddAutoMapper(typeof(MappingProfile));
builder.Services.AddMediatR(cfg => cfg.RegisterServicesFromAssemblyContaining(typeof(MoneyTransferHandler)));
builder.Services.AddScoped<IRabbitMqBus, RabbitMqBus>();

Here is our request DTO

public class MoneyTransferCommandDto : IRequest
{
public int TransferAmount { get; set; }
public string RecipientName { get; set; }
public string SenderName { get; set; }
}

2# Application

Create a class project and add the command and handler.

Our application will handle the logic of our application following clean architecture principles

public class MoneyTransferHandler : IRequestHandler<MoneyTransferCommandDto>
{
private readonly IMapper _mapper;
private readonly IRabbitMqBus _rabbitMqBus;

public MoneyTransferHandler(IMapper mapper, IRabbitMqBus rabbitMqBus)
{
_mapper = mapper;
_rabbitMqBus = rabbitMqBus;
}

public async Task Handle(MoneyTransferCommandDto request, CancellationToken cancellationToken)
{
var moneyTransferCommand = _mapper.Map<MoneyTransferCommand>(request);

await _rabbitMqBus.Publish(moneyTransferCommand);
}

}
public class MoneyTransferCommand
{
public int TransferAmount { get; set; }
public string RecipientName { get; set; }
public string SenderName { get; set; }
public int TransactionId { get; set; }
}
public class MappingProfile : Profile
{
public MappingProfile()
{
CreateMap<MoneyTransferCommand, MoneyTransferCommandDto>();
CreateMap<MoneyTransferCommandDto, MoneyTransferCommand>();
}
}

3# RabbitMQ Application

Our Data project where we handle the RabbitMQ logic. This will be in a class project.

- Add connection

- Create channel

- Create a fanout exchange

- Create queues

- Create message

- Publish message

- Close channel and connection (use using)

public class RabbitMqBus: IRabbitMqBus
{
private readonly string _queueName = "BobSQueue";
private readonly string _exchangeName = "bob-fanout-exchange";

// channel publish
public async Task Publish<T>(T command)
{
// Create Connection
using(var connection = SetUpConnection())
{
// Create Channel
using (var channel = SetUpChannel(connection))
{
// Create a fanout exchange
channel.ExchangeDeclare(
_exchangeName, //name of exchange
ExchangeType.Fanout, //type of exchange
false, // durable
false, // durable
null // arguments
);

// channel QueueDeclare = create 1st queue
channel.QueueDeclare(_queueName + 1,
false, //durable: messages sent using this method persist only in the memory and not survive a server restart.
false, //exclusive
false, // auto-delete
null); // arguments



// channel QueueDeclare = create 2nd queue
channel.QueueDeclare(_queueName + 2,
false, //durable: messages sent using this method persist only in the memory and not survive a server restart.
false, //exclusive
false, // auto-delete
null); // arguments

// Create the message
var message = CreateBody(command);

// publish the message
channel.BasicPublish
(
_exchangeName, //exchangeName
string.Empty, //routingKey > not needed for fanout exchange
false, //mandatory
null, //basicProperties
message //body
);
}
}
}

// CreateConnection
private IConnection SetUpConnection()
{
var connectionFactory = new ConnectionFactory();

connectionFactory.HostName = "localhost"; // connect to a RabbitMQ node on local machine
connectionFactory.VirtualHost = "/";
connectionFactory.Port = 5672;
connectionFactory.UserName = "guest";
connectionFactory.Password = "guest";

return connectionFactory.CreateConnection(); // abstract socket connection
}

// Create Channel
private IModel SetUpChannel(IConnection connection)
{
return connection.CreateModel(); // channel = where apis reside

// A channel (a virtual connection) is created in the TCP connection.
// Publishing /consuming messages and subscribing to a queue are all done over a channel.
}

// Encode message body
private byte[] CreateBody<T> (T command)
{
var message = JsonConvert.SerializeObject(command);

return Encoding.UTF8.GetBytes(message);
}
}

Interface

public interface IRabbitMqBus
{
Task Publish<T> (T command);
}

4# RabbitMQ Subscriber

Create a console app project

- Add connection

- Create channel

- Bind queues to the exchange

- Create consumer

- Receive message

- Subscribe to the queue

- Close channel and connection (use using)

public class RabbitMqSubscriber : IRabbitMqSubscriber
{

private readonly string _queueName = "BobSQueue";
private readonly string _exchangeName = "bob-fanout-exchange";

public void Subscribe()
{
using(var connection = SetUpConnection())
{
using(var channel = SetUpChannel(connection))
{
// Bind queue 1
channel.QueueBind(
_queueName + 1, // queue name
_exchangeName, // exchange name
String.Empty, // routing key
null // arguments
);

// Bind queue 2
channel.QueueBind(
_queueName + 2, // queue name
_exchangeName, // exchange name
String.Empty, // routing key
null // arguments
);

// Create consumer
var consumer = new EventingBasicConsumer(channel);

// Receive Message
consumer.Received += (sender, e) =>
{
var message = Encoding.UTF8.GetString(e.Body.ToArray());
Console.WriteLine(message);
};

// Subscribe to the queue
var result1 = channel.BasicConsume(_queueName + 1, true, consumer);
var result2 = channel.BasicConsume(_queueName + 2, true, consumer);

Console.WriteLine(result1);
Console.WriteLine(result2);
}
}
}

// CreateConnection
private IConnection SetUpConnection()
{
var connectionFactory = new ConnectionFactory();

connectionFactory.HostName = "localhost"; // connect to a RabbitMQ node on local machine
connectionFactory.VirtualHost = "/";
connectionFactory.Port = 5672;
connectionFactory.UserName = "guest";
connectionFactory.Password = "guest";

return connectionFactory.CreateConnection(); // abstract socket connection
}

// Create Channel
private IModel SetUpChannel(IConnection connection)
{
return connection.CreateModel();
}
}

Interface

public interface IRabbitMqSubscriber
{
void Subscribe();
}

Program.cs

private static void Main(string[] args)
{
var subscriber = new RabbitMqSubscriber();

subscriber.Subscribe();

Console.WriteLine("Subscriber executed");
}

Testing the code

Well this was a lot of code, so let’s go through it step by step and see what happens live.

First of all, you will need to connect to the RabbitMQ management UI

There will you will land on the overview

Go around and see the connections (empty), channels (empty), exchanges (list of default ones starting with amq.[typeofexchange]), queues (empty)

Now Go in debug mode to your RabbitMQ Application project and see what happens in the UI

Start the Controller project and send the API request using Postman or Swagger.

Create Connection

using (var channel = SetUpChannel(connection))

You will see the connection created

using (var channel = SetUpChannel(connection))

In Channels, the channel has been set up (guest)

Create exchange(fanout)

// Create a fanout exchange
channel.ExchangeDeclare(
_exchangeName, //name of exchange
ExchangeType.Fanout, //type of exchange
false, // durable
false, // durable
null // arguments
);

You can see the newly created exchange of type fanout (here bob-fanout-exchange)

// channel QueueDeclare = create 1st queue
channel.QueueDeclare(_queueName + 1,
false, //durable: messages sent using this method persist only in the memory and not survive a server restart.
false, //exclusive
false, // auto-delete
null); // arguments



// channel QueueDeclare = create 2nd queue
channel.QueueDeclare(_queueName + 2,
false, //durable: messages sent using this method persist only in the memory and not survive a server restart.
false, //exclusive
false, // auto-delete
null); // arguments

Two queues are going to be created, with ready 0 (meaning 0 messages ready to be sent)

// Create the message
var message = CreateBody(command);

// publish the message
channel.BasicPublish
(
_exchangeName, //exchangeName
string.Empty, //routingKey > not needed for fanout exchange
false, //mandatory
null, //basicProperties
message //body
);
}

Now we have sent one message to each queue, you can see that the ready is at 1

The two messages are ready to be consumed

Now you can Run your Subscriber project, set a breaking point in your Program.cs where you instantiate the Rabbit subscriber (otherwise you won’t see the app consuming the message) and observe the command line.

After hitting the subscriber.Subscribe(); you will receive the following message in the command line

Conclusion

This was a very basic demo of RabbitMQ, please comment if you have any remarks or better ways of doing it. Thank you!

Next step

Integrate RabbitMQ with your cloud instances

Documentation

This article has been made possible by reading all the below doc, so thanks to all the authors.

--

--

Bob Code

All things related to Memes, .Net/C#, Azure, DevOps and Microservices