Pub-Sub in .NET with MassTransit

Integrating Scalable, Distributed .NET Systems With RabbitMQ

Roko Kovač
8 min readOct 16, 2023

Introduction

There are many ways to implement backend communication using HTTP Request-Response, ranging from simple techniques like Short and Long Polling, to more sophisticated, push-like patterns such as Server-Sent Events or Webhooks.

However, there comes a point when you hit a ceiling with Request-Response and simply need a performant, scalable, full-blown Pub-Sub messaging system.

In this article, I will cover what problems Pub-Sub solves, and a high-level implementation in .NET using a very useful library called MassTransit.

Publish-Subscribe

Publish-Subscribe, or Pub-Sub in short, is a messaging pattern that solves many problems inherent to Request-Response.

Asynchronous communication

Request-Response is synchronous by nature, which poses an issue when solving problems that require asynchronous behavior.

It is possible to implement asynchronous communication with Request-Response using something like webhooks, but such solutions don’t scale very well and come with another important problem — tight coupling.

Decoupling Senders from Receivers

A Publish-Subscribe system decouples the publishers (senders) of the messages from the subscribers (receivers) by introducing a middleman.

Not only does this allow for multiple senders and receivers to send messages to each other, but it also makes the system more resilient to change and easier to scale.

Scalability

As the subscribers are decoupled from the publishers, scaling becomes much simpler. If you need to process messages faster, you can simply add more subscribers. Want to have multiple sources of a single event? Add more publishers.

The Key Components

There are a few key components in every message broker implementation.

Message

The source of information. It usually represents a domain event, e.g. an item being created.

Broker

This is a generic term for a messaging middleman. It receives messages and knows how and where to forward them.

Producer

A Producer (sender, publisher) produces messages to the broker.

Consumer

A Consumer (receiver, subscriber) consumes messages from the broker.

Queue

A queue is a buffer between producers and consumers. This is where the messages stay until they’re consumed.

It typically operates as any other queue — first in, first out.

Multiple consumers can consume messages from a single queue, but a single message can be consumed from a queue only once.

Topic

A topic is the central point of a message broker Pub-Sub system. It forwards messages to all queues that are subscribed to that specific topic.

Subscription

A subscription, or binding, is the connection between a queue and a topic. Each queue that is interested in a topic needs to subscribe to it.

A typical Pub-Sub System using a Message Broker

With the general terminology out of the way, we can take a closer look at RabbitMQ.

RabbitMQ

RabbitMQ is a popular and feature-rich, open-source message broker.

Because of the amount of features it supports, it can be overwhelming to work with when first starting out. However, MassTransit makes it really easy to set up efficient Pub-Sub messaging with just a few lines of code.

Exchanges

Without going into too much detail on RabbitMQ, let’s go over some basics of exchanges so that we can understand the default MassTransit message topology.

In RabbitMQ, exchanges are the middlemen between producers and queues. They are the equivalent of the Topic we defined earlier, with a few caveats.

There are 3 types of exchanges, each with a different way of routing messages. The 3 types are direct, fanout and topic. In addition, exchanges can also bind to each other, allowing for complex routing rules.

MassTransit mainly uses fanout exchanges, which is an exchange that simply routes messages to all bound queues and exchanges.

Exchanges can be durable (persisted on disk, survive restarts), or transient (persisted in memory, don’t survive restarts).

Message Topology

MassTransit follows the design principle of Smart Endpoints and Dumb Pipes, using the broker only for the basic delivery of messages, and keeping the routing rules on it to the bare minimum.

MassTransit will create a durable, fanout exchange for each message type and a durable, fanout exchange for each queue.

A default MassTransit RabbitMQ message topology

Single Queue Per Consumer Type

MassTransit creates a queue for each consumer type, which is important in a Pub-Sub system.

To understand why, let’s go over a real-life example.

In a service-oriented architecture, if you want to scale up a service to handle increased traffic, you simply spin up another replica of the service (Consumer 2 on the diagram above).

You want these two replicas (Consumer 2.1 and Consumer 2.2) to share the load of processing the ItemCreated message. This is done by subscribing them to the same queue. Since a single message in a queue can be consumed only once, the queue effectively becomes a load balancer.

On the other hand, you wouldn’t want a service (Consumer 1) to miss a message because the other service (Consumer 2) consumed it. This is why you need a separate queue for each consumer type.

Naming

MassTransit requires all contracts to be defined within a namespace, and uses that namespace when creating exchanges.

For example, if we define an ItemCreated message in a namespace called Contracts, the exchange for that message will be called Contracts:ItemCreatedMessage.

The consumer exchange and queue are named after the message type, in this case, ItemCreatedMessage.

If you don’t like these conventions, you can configure them in the setup. To keep things simple, we will use the default convention.

Creating The Project

If you want to skip creating the project yourself, the source code is on my GitHub.

We’re going to create three projects: Producer, Consumer, and Contracts.

dotnet new console -n Producer
dotnet new console -n Consumer
dotnet new classlib -n Contracts

For the Producer and Consumer, we’re going to need some packages.

dotnet add package MassTransit.RabbitMQ
dotnet add package Microsoft.Extensions.DependencyInjection

If you don’t have a RabbitMQ instance running, you can run it on your local machine easily using Docker.

docker run -d --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management

Defining The Contract

In the Contracts project, we will define the ItemCreatedMessage. We’re keeping it simple with just a Name and a Price.

Keep in mind, contracts are required to be within a namespace.

namespace Contracts;

public record ItemCreatedMessage(string Name, double Price);

Creating a RabbitMQ Producer

Let’s create a RabbitMQ producer and set it up to publish the ItemCreatedMessage on keypress.

We will connect to the local RabbitMQ instance and use the default endpoint configuration.

It is possible to create an IBusControl without using the service collection, but we will use it to keep the code consistent with an ASP.NET Core app.

using MassTransit;
using Contracts;
using Microsoft.Extensions.DependencyInjection;

var services = new ServiceCollection();
services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});
cfg.ConfigureEndpoints(context);
});
});

var serviceProvider = services.BuildServiceProvider();
var bus = serviceProvider.GetRequiredService<IBusControl>();
await bus.StartAsync();

Console.WriteLine("Press any key to send a message...");
while (true)
{
Console.ReadKey(true);
await bus.Publish(new ItemCreatedMessage(Name: "Bucket", Price: 12.55));
Console.WriteLine("Message sent.");
}

Creating a Consumer

In the Consumer project, we will define the consumer class.

The consumer interface is very simple, it requires a single Consume method that accepts the context with the message type.

In the Consume method, we will simply print out the created item info.

using MassTransit;
using Contracts;

namespace Consumer;

public record ItemCreatedMessageConsumer : IConsumer<ItemCreatedMessage>
{
public Task Consume(ConsumeContext<ItemCreatedMessage> context)
{
Console.WriteLine(
$"Item '{context.Message.Name}' " +
$"with price '{context.Message.Price}' created.");

return Task.CompletedTask;
}
}

In the Program.cs, we will use a similar setup as with the producer.

We connect to the local RabbitMQ instance and use the default convention-based endpoint configuration. This will create our exchange and queue (if they don’t exist), open a connection, and bind our consumer to the queue.

using MassTransit;
using MessageBroker.Consumer;
using Microsoft.Extensions.DependencyInjection;

var services = new ServiceCollection();

services.AddMassTransit(x =>
{
x.AddConsumer<ItemCreatedMessageConsumer>();

x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("guest");
h.Password("guest");
});

cfg.ConfigureEndpoints(context);
});
});

var serviceProvider = services.BuildServiceProvider();
var bus = serviceProvider.GetRequiredService<IBusControl>();
await bus.StartAsync();

Console.WriteLine("Waiting for messages...");
Console.ReadLine();

Let’s try to run this and see if it works. We will run a single Producer and a single Consumer.

One Producer, One Consumer

Looks good. Let’s try with multiple consumers.

One Producer, Multiple Consumers

As you can see, we sent two messages but they each got a single message.

This is because we used the default endpoint configuration in our Consumer setup, which means MassTransit bound both of our consumers to the same queue: ItemCreatedMessage.

If you’re developing a microservice, this is probably the behavior you would want. You want to balance the load equally between all replicas of the same service. In the context of the diagram we looked at earlier, these would be the Consumer 2.1 and Consumer 2.2.

However, just to test that our Pub/Sub setup is working, let’s add a queue parameter

Console.Write("Enter queue name: ");
var queue = Console.ReadLine();

…and configure the endpoint manually.

// cfg.ConfigureEndpoints(context);
cfg.ReceiveEndpoint(queue, e =>
{
e.Consumer<ItemCreatedMessageConsumer>();
});

Let’s try running them now. We will recreate the setup from the diagram we looked at earlier.

One Producer, Multiple Consumer Types, Multiple Consumers

When sending two messages, Consumer 1, which is bound to queue 1, received both of them.

On the other hand, Consumer 2.1 and Consumer 2.2 received one message each, since they’re bound to the same queue — queue 2.

Conclusion

I have shown you how to quickly set up a RabbitMQ Producer and Consumer using MassTransit.

To keep the article short and simple, I omitted many advanced features that a messaging system would use.

Having said that, we have covered everything you need to introduce messaging into your system and evolve it over time.

If you don’t want to use RabbitMQ, MassTransit supports many other brokers, and the setup to use them is equally as simple.

If you‘re interested in .NET topics, consider subscribing to get notified when i publish more 😉

Any thoughts? Leave a comment.

--

--