Implement RabbitMQ in .NET Core 8

Mahedi Hasan Niloy
C# Programming
5 min readMay 31, 2024

--

RabbitMQ is a powerful and widely-used open-source message broker that facilitates communication between distributed systems. It implements the Advanced Message Queuing Protocol (AMQP), enabling applications to send and receive messages via queues, ensuring reliable and efficient data transfer. RabbitMQ supports various messaging patterns such as publish/subscribe, request/reply, and point-to-point, making it versatile for numerous use cases.

In the context of .NET 8, integrating RabbitMQ can significantly enhance the scalability and resilience of your applications. With the latest .NET 8 features, developers can leverage improved performance and security when implementing RabbitMQ. This article will guide you through setting up RabbitMQ, configuring it in a .NET 8 environment, and demonstrating practical examples of sending and receiving messages. By the end of this tutorial, you’ll have a robust understanding of how to harness RabbitMQ to build scalable, distributed systems using .NET 8.

In this article we can see how to implement RabbitMQ in .NET Core 8.

To implement RabbitMQ in your .NET 8 application, you’ll need to use specific NuGet packages that facilitate communication with RabbitMQ. The two primary packages you should include are MassTransit and MassTransit.RabbitMQ. MassTransit is a widely-used service bus for .NET that simplifies message-based application development, while MassTransit.RabbitMQ provides the necessary integration with RabbitMQ.

Here’s how you can add these packages to your .NET 8 project:

  1. Install MassTransit:
dotnet add package MassTransit

2. Install MassTransit.RabbitMQ:

dotnet add package MassTransit.RabbitMQ

Then in appsetting.json file add this RabbitMQ configure code :

"RabbitMQ": {
"HostName": "localhost",
"UserName": "guest",
"Password": "guest"

}

Now add a class for RabbitMQ related setting configure

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace SalesPulse.MessageBroker
{
public class RabbitMQSetting
{
public string? HostName { get; set; }
public string? UserName { get; set; }
public string? Password { get; set; }
}

//RabbitMQ Queue name
public static class RabbitMQQueues
{
public const string OrderValidationQueue = "orderValidationQueue";
public const string AnotherQueue = "anotherQueue";
public const string ThirdQueue = "thirdQueue";
}
}

We need a publisher service class to publish message. It is good practice to make a generic service class for better code reusability. Message publisher service class is like that :

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using RabbitMQ.Client;

namespace SalesPulse.MessageBroker.Services
{
public class RabbitMQPublisher<T> : IRabbitMQPublisher<T>
{
private readonly RabbitMQSetting _rabbitMqSetting;

public RabbitMQPublisher(IOptions<RabbitMQSetting> rabbitMqSetting)
{
_rabbitMqSetting = rabbitMqSetting.Value;
}

public async Task PublishMessageAsync(T message, string queueName)
{

var factory = new ConnectionFactory
{
HostName = _rabbitMqSetting.HostName,
UserName = _rabbitMqSetting.UserName,
Password = _rabbitMqSetting.Password
};

using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);

var messageJson = JsonConvert.SerializeObject(message);
var body = Encoding.UTF8.GetBytes(messageJson);

await Task.Run(() => channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body));
}
}
}

and make a interface class also

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace SalesPulse.MessageBroker.Services
{
public interface IRabbitMQPublisher<T>
{
Task PublishMessageAsync(T message, string queueName);
}
}

So, our publisher service class is ready, now we need a publisher model class, the model class is nothing but a entity for our publisher message data.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace SalesPulse.MessageBroker
{
public class OrderValidation
{
public Guid? OrderEntryById { get; set; }
public string? OrderEntryByCode { get; set; }
public List<ProductValidation>? Products { get; set; }
public string? SupervisorDeviceId { get; set; }
public long OrderNumber { get; set; }
public Guid NotificationReceiverId { get; set; }
}

public class ProductValidation
{
public Guid? ProductId { get; set; }
public string? ProductCode { get; set; }
public decimal? OrderAmount { get; set; }
public string? ProductName { get; set; }
}

}

Our work for the publisher end almost done, now we have to inject our publisher service class in controller constructor and use the publisher service method to publish message. In my case when order entry i have to validate the order and based on that a number of people get push notification. So i publish my message in order validation queue


// Prepare validation data

var orderValidation = await _chemistService.GetOrderValidationInfoByOrderEntry(order.OrderEntryBy, order, orderDetailsList);

// publish order validation data
await _orderValidationMqPublisher.PublishMessageAsync(orderValidation, RabbitMQQueues.OrderValidationQueue);

and lastly configure those service in program.cs file. Use AddScoped for dependency lifecycle of Publisher service class.

builder.Services.Configure<RabbitMQSetting>(configuration.GetSection("RabbitMQ"));
builder.Services.AddScoped(typeof(IRabbitMQPublisher<>), typeof(RabbitMQPublisher<>));

Our work for the publisher end is done. Now we have to configure in consumer end..Lets see how to configure that..

The process of setting in appsetting and other setting class is same as producer. and also add same publisher class in consumer end, or you have to mapped between two class to get data.

Here i need a consumer service class, which is basiclly consume the message from queue.

using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using PulseWorker.Data;
using PulseWorker.Domain;
using PulseWorker.Service;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace PulseWorker.MessageBroker
{
public class OrderValidationMessageConsumerService : BackgroundService
{
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<OrderValidationMessageConsumerService> _logger;
private readonly RabbitMQSetting _rabbitMqSetting;
private IConnection _connection;
private IModel _channel;

public OrderValidationMessageConsumerService(IOptions<RabbitMQSetting> rabbitMqSetting, IServiceProvider serviceProvider, ILogger<OrderValidationMessageConsumerService> logger)
{
_rabbitMqSetting = rabbitMqSetting.Value;
_serviceProvider = serviceProvider;
_logger = logger;

var factory = new ConnectionFactory
{
HostName = _rabbitMqSetting.HostName,
UserName = _rabbitMqSetting.UserName,
Password = _rabbitMqSetting.Password
};
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
StartConsuming(RabbitMQQueues.OrderValidationQueue, stoppingToken);
await Task.CompletedTask;
}

private void StartConsuming(string queueName, CancellationToken cancellationToken)
{
_channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);

var consumer = new EventingBasicConsumer(_channel);
consumer.Received += async (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);

bool processedSuccessfully = false;
try
{
processedSuccessfully = await ProcessMessageAsync(message);
}
catch (Exception ex)
{
_logger.LogError($"Exception occurred while processing message from queue {queueName}: {ex}");
}

if (processedSuccessfully)
{
_channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
else
{
_channel.BasicReject(deliveryTag: ea.DeliveryTag, requeue: true);
}
};

_channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
}

private async Task<bool> ProcessMessageAsync(string message)
{
try
{
using (var scope = _serviceProvider.CreateScope())
{
var notificationService = scope.ServiceProvider.GetRequiredService<INotificationService>();
var notificationRepository = scope.ServiceProvider.GetRequiredService<INotificationRepository>();
var orderValidationRepository = scope.ServiceProvider.GetRequiredService<IValidationRepository>();
var orderValidation = JsonConvert.DeserializeObject<OrderValidation>(message);

if (string.IsNullOrEmpty(orderValidation?.SupervisorDeviceId) || orderValidation.Products == null)
{
return true;
}

foreach (var item in orderValidation.Products)
{
bool isExceed = await orderValidationRepository.IsValidationAmountExceed(item.ProductId);

if (isExceed)
{
var notificationModel = new NotificationModel
{
DeviceId = orderValidation.SupervisorDeviceId,
IsAndroiodDevice = true,
Title = "OrderValidation",
Body = $"Chemist Order - {orderValidation.OrderNumber} exceeds the validation amount for product {item.ProductName}-{item.ProductCode}"
};

var notificationResponse = await notificationService.SendNotificationAsync(notificationModel);

if (!notificationResponse.IsSuccess)
{
return false;
}

var notification = new Notification
{
Title = notificationModel.Title,
Body = notificationModel.Body,
ReceiverId = orderValidation.NotificationReceiverId,
TypeIdentifyId = orderValidation.OrderNumber.ToString(),
Type = notificationModel.Title,
};

await notificationRepository.AddNotificationAsync(notification);
await notificationRepository.SaveNotificationAsync();

return true;
}
}

return true;
}
}
catch (Exception ex)
{
_logger.LogError($"Error processing message: {ex.Message}");
return false;
}
}

public override void Dispose()
{
_channel.Close();
_connection.Close();
base.Dispose();
}
}
}

Here my consumer service is consume the message and validate the order , if validation rule break then send push notification.

now configure the service class a hosted service in program.cs

// RabbitMQ Configuration
builder.Services.Configure<RabbitMQSetting>(builder.Configuration.GetSection("RabbitMQ"));
// Register the consumer service as a hosted service only
builder.Services.AddHostedService<OrderValidationMessageConsumerService>();

That's all, this is very basic and simple setup of RabbitMQ in .NET 8.

Hope this article will help to implement the RabbitMQ in .NET Core API..

--

--