Simple Publisher and Consumer Libraries using RabbitMQ for NET 6 Web API
This article are inspired and motivated from my personal microservice project. I need to create communication between two or more services by background process. Basically it’s a simple way to create message queue mechanism using RabbitMQ. We simply define the connection provider class (of course with channel / virtual connection), publisher class and consumer class. I’d like to make description for each syntax that I’ve made.
First of all we define the file structure in the project
Please refers to Github Repository to see full source code
- IConnectionProvider.cs Interface
public interface IConnectionProvider : IDisposable
{
IConnection GetConnection();
}
By Microsoft documentation, the main of using IDisposable interface is to clean up unmanaged resource such as database connection, socket, windows handle including RabbitMQ connection.
2. ConnectionProvider.cs Class
public class ConnectionProvider : IConnectionProvider
{
private readonly ConnectionFactory _factory;
private readonly IConnection _connection;
private bool _disposed;
public ConnectionProvider(string url)
{
_factory = new ConnectionFactory
{
Uri = new Uri(url)
};
_connection = _factory.CreateConnection();
}
public IConnection GetConnection()
{
return _connection;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
protected virtual void Dispose(bool disposing)
{
if (_disposed)
return;
if (disposing)
_connection?.Close();
_disposed = true;
}
}
- Don’t forget to implement the interface.
- ConnectionFactory = object encapsulates a set of connection configuration parameters that has been defined by an administrator (username, password, hostname, port etc). It will construct a connection instance
- IConnection : In order to create session / channel (channel is like virtual connection)
- For dispose mechanism, you able to read more on this link
3. IPublisher.cs Interface
public interface IPublisher : IDisposable
{
void Publish(
string message,
string routingKey,
IDictionary<string, object> headers,
string timeToLive = null);
}
- Message can be data or command. For instance, publisher / producer able to sent the data object to the subscriber / consumer so that consumer can use the data directly. On the other hand, publisher can send only command (like do insert data from particular table). So, once consumer get the command, it will call particular service / resource to get data and insert to their storage.
- Routing key (address) is message attribute that sent by publisher / producer to message header attribute.
- Headers is optional values to route the messages. It’s key value pair
- Time to Live is message TTL to determine how long messages can be retained in a queue. If the period of time is exceed then the expires message will be discarded. It will use “x-message-ttl” as the key and integer as the value
4. Publisher.cs Class
public class Publisher : IPublisher
{
private readonly IConnectionProvider _connectionProvider;
private readonly string _exchange;
private readonly IModel _model;
private bool _disposed;
public Publisher(IConnectionProvider connectionProvider, string exchange, string exchangeType, int timeToLive = 30000)
{
_connectionProvider = connectionProvider;
_exchange = exchange;
_model = _connectionProvider.GetConnection().CreateModel();
var ttl = new Dictionary<string, object>
{
{"x-message-ttl", timeToLive }
};
_model.ExchangeDeclare(_exchange, exchangeType, arguments: ttl);
}
public void Publish(string message, string routingKey, IDictionary<string, object> headers, string timeToLive = "30000")
{
var body = Encoding.UTF8.GetBytes(message);
var properties = _model.CreateBasicProperties();
properties.Persistent = true;
properties.Headers = headers;
properties.Expiration = timeToLive;
_model.BasicPublish(_exchange, routingKey, properties, body);
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
// Protected implementation of Dispose pattern.
protected virtual void Dispose(bool disposing)
{
if (_disposed)
return;
if (disposing)
_model?.Close();
_disposed = true;
}
}
- _model.ExchangeDeclare : will create exchange rely on its type.
public void Publish(string message, string routingKey, IDictionary<string, object> headers, string timeToLive = "30000")
{
var body = Encoding.UTF8.GetBytes(message);
var properties = _model.CreateBasicProperties();
properties.Persistent = true;
properties.Headers = headers;
properties.Expiration = timeToLive;
_model.BasicPublish(_exchange, routingKey, properties, body);
}
- Encoding.UTF8.GetBytes(message) : will convert json text to bytes array
- Persistent = true : to avoid the queue / message lost. It works for simple queue task. But for complex queue task it doesn’t give 100% guarantee that message won’t be lost.
- Headers : it’s a key value pairs option values
- _model.BasicPublish : It’s a process whereby the publisher publish the message to RabbitMQ
5. ISubscriber.cs Interface
public interface ISubscriber : IDisposable
{
void Subscribe(Func<string, IDictionary<string, object>, bool> callback);
void SubscribeAsync(Func<string, IDictionary<string, object>, Task<bool>> callback);
}
We provide synchronous and asynchronous method for each return in callback method
6. Subscriber.cs Class
public class Subscriber : ISubscriber
{
private readonly IConnectionProvider _connectionProvider;
private readonly string _exchange;
private readonly string _queue;
private readonly IModel _model;
private bool _disposed;
public Subscriber(
IConnectionProvider connectionProvider,
string exchange,
string queue,
string routingKey,
string exchangeType,
int timeToLive = 30000,
ushort prefetchSize = 10)
{
_connectionProvider = connectionProvider;
_exchange = exchange;
_queue = queue;
_model = _connectionProvider.GetConnection().CreateModel();
var ttl = new Dictionary<string, object>
{
{"x-message-ttl", timeToLive }
};
_model.ExchangeDeclare(_exchange, exchangeType, arguments: ttl);
_model.QueueDeclare(_queue,
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
_model.QueueBind(_queue, _exchange, routingKey);
_model.BasicQos(0, prefetchSize, false);
}
public void Subscribe(Func<string, IDictionary<string, object>, bool> callback)
{
var consumer = new EventingBasicConsumer(_model);
consumer.Received += (sender, e) =>
{
var body = e.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
bool success = callback.Invoke(message, e.BasicProperties.Headers);
if (success)
{
_model.BasicAck(e.DeliveryTag, true);
}
};
_model.BasicConsume(_queue, false, consumer);
}
public void SubscribeAsync(Func<string, IDictionary<string, object>, Task<bool>> callback)
{
var consumer = new AsyncEventingBasicConsumer(_model);
consumer.Received += async (sender, e) =>
{
var body = e.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
bool success = await callback.Invoke(message, e.BasicProperties.Headers);
if (success)
{
_model.BasicAck(e.DeliveryTag, true);
}
};
_model.BasicConsume(_queue, false, consumer);
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
// Protected implementation of Dispose pattern.
protected virtual void Dispose(bool disposing)
{
if (_disposed)
return;
if (disposing)
_model?.Close();
_disposed = true;
}
}
Notes for Constructor:
- _model.ExchangeDeclare = We will create the exchange in consumer / subscriber. It’s possible subscriber application will run early than publisher application. So if the publisher run, then consumer will use the same exchange with the publisher
- _model.QueueDeclare = in this method we will see that we sent durable to true. It keep the queue / message is not loss eventhough the server die / restart
- _modelQueryBind = It will bind or map between exchange and queue using routing key. Without mapping we never get the message from queue.
- _model.BasicQos = Here we set to 0. It means treated as infinite, allowing any number of unacknowledged messages. Please refer to this article to get better understanding of BasicQos
Notes for Subscribe method :
var consumer = new EventingBasicConsumer(_model);
consumer.Received += (sender, e) =>
{
.....
}
Above code will create event listener
var body = e.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Above code will try to get message / data from queue
bool success = callback.Invoke(message, e.BasicProperties.Headers);
Above code is callback. We will call our method (this method maybe perform some data processing either insert or update into database / storage or anything) through callback. Once the data has been read by the consumer application it will return TRUE
if (success)
{
_model.BasicAck(e.DeliveryTag, true);
}
Above code will acknowledge that message has received by the consumer, then the message will clear from the queue. If we don’t acknowledge the message, then the queue will sent the message to the consumer again.
How to use the Helper of Publisher and Subscriber
- Program.cs in Publisher Application
// Create connection
// amqp://guest:guest@localhost:5672 => amqp://{username}:{passwoed}@{url}:{rabbitmq port}
builder.Services.AddSingleton<IConnectionProvider>(new ConnectionProvider("amqp://guest:guest@localhost:5672"));
// Register IPublisher in DI container
// Define exchange name as well as exchange topic
builder.Services.AddSingleton<IPublisher>(x => new Publisher(x.GetService<IConnectionProvider>(),
"order-exchange",
ExchangeType.Topic));
2. Controller in Publisher Application
Before of that, create IPublisher variable (set as readonly) and assign the object from the constructor
// create publish mechanism (it will send the data object, routing key and optional parameters)
_publisher.Publish(JsonConvert.SerializeObject(food), "order.food", null);
3. Program.cs in Subscriber Application
// Create connection
// amqp://guest:guest@localhost:5672 => amqp://{username}:{passwoed}@{url}:{rabbitmq port}
builder.Services.AddSingleton<IConnectionProvider>(new ConnectionProvider("amqp://guest:guest@localhost:5672"));
// Register ISubscriber in DI container
// We will set echange name, queue name, exchange pattern and exchange type
builder.Services.AddSingleton<ISubscriber>(x => new Subscriber(x.GetService<IConnectionProvider>(),
"order-exchange",
"order-food-queue",
"order.*",
ExchangeType.Topic));
4. Event Handler
Basically in service application, we need to create event handler. But for simplification I just show simple method for the consumer
subscriber.Subscribe((message, header) => {
Console.WriteLine(message);
return true;
});
//or
subscriber.SubscribeAsync(async (message, header) => {
Console.WriteLine(message);
return await Task.FromResult(true);
});