Lightweight .NET in-memory pub/sub implementation using Channels

MediumP
8 min readSep 7, 2023

--

Hi, in this article I’ll present to you how you can easily kick-off implementing performant in-memory pub/sub pattern using Channel<T> for asynchronous processing within your .NET server.

In this article, we will:

  • See what are Channels and how we can use them
  • Create interfaces as building blocks of our library supporting pub/sub
  • Implement these interfaces for in-memory event bus based on Channel<T>
  • Provide a user with a fluent registration
  • Provide a user with an interface to resolve scoped event metadata on the fly
  • Reflect on ways we can now leverage our interfaces and implement other libraries for distributed environment and event sourcing

Motivation:

Let’s assume you have a monolith server that needs to process some data asynchronously.

We can achieve that by delegating the work away from the main UI thread and processing it in its own context asynchronously. We could also utilize hosted services as singleton background workers that would process our work.

It’s not a hard thing to achieve in a dotnet environment.

However, most of the time for smaller projects or pet projects to turn bigger there’s a prevalent need to prototype fast and change often before having a true need for fully distributed and scalable solutions.

Therefore, we might consider taking the middle ground and going about asynchronous processing in an easy way through readily available native .NET resources via appropriate abstractions that allow us to scale in the future.

We will reflect on what Channels are, and how we can use them to achieve our goal by implementing slim interfaces that wrap channel functionality and provide us with an easy-to-use pub/sub pattern with a parallel consumer model.

On Channel<T>

Channels are .net standard implementation of pub/sub pattern and are available in the System.Threading.Channels NuGet package.

Channels come in two flavors:

  • Bounded — has the capacity as a limit on how many items the channel can hold and if it’s full, it will block the producer in an async way until it gets more space
  • Unbounded — has no such restriction, can be used by any number of readers and writers concurrently

Both flavors come with individual configuration options to override default behaviors.

In the context of bounded channels, we can also provide a behavior on how the channel acts when it’s full:

  • Wait, Drop newest, Drop oldest, Drop write

Basic usage of a channel:

Following code fire and forgets task where while loop produces random numbers while the code subscribes to event bus changes and reads from async iterator continuously.

var rnd = new Random();
var exampleChannel = Channel.CreateBounded<int>(10);

Task.Run(() =>
{
while (true)
{
exampleChannel.Writer.WriteAsync(rnd.Next(0, 10));
Thread.Sleep(500);
}
});

await foreach (var output in exampleChannel.Reader.ReadAllAsync())
{
Console.WriteLine(output);
}

More documentation on channels can be found at the following link
https://learn.microsoft.com/en-us/dotnet/core/extensions/channels

Building our wrapper library

The contracts:

We will start off by introducing 4 service contracts and 2 entity models supporting our library.

/// <summary>
/// Publishes our custom event into event broker
/// </summary>
public interface IProducer<T> : IAsyncDisposable
{
ValueTask Publish(Event<T> @event, CancellationToken token = default);
}
/// <summary>
/// Starts processing our event bus
/// We can manipulate Start and Stop methods to
/// turn processing on or off
/// </summary>
public interface IConsumer : IAsyncDisposable
{
ValueTask Start(CancellationToken token = default);
ValueTask Stop(CancellationToken token = default);
}

public interface IConsumer<T> : IConsumer
{
}
/// <summary>
/// Handles incoming event
/// </summary>
public interface IEventHandler<in T>
{
ValueTask Handle(T? time, CancellationToken token = default);
}
/// <summary>
/// Event metadata accessor in current async context
/// </summary>
public interface IEventContextAccessor<T>
{
public Event<T>? Event { get; }
void Set(Event<T> @event);
}
// Entity models

public record EventMetadata(string CorrelationId);
public record Event<T>(T? Data, EventMetadata? Metadata = default);

The general idea is to provide the user with an easy way of bootstrapping this event bus and defining their own typed handlers.

So we will also provide extensions on service collection to allow the developer a nice experience.

Next up we have implementations of consumer, producer, and their service registration.

Our implementation of the producer is slim and relies on dotnet Channels<T>

The producer wraps the provided channel writer and implements the interface by providing events into an internal channel.

internal sealed class InMemoryEventBusProducer<T> : IProducer<T>
{
private readonly ChannelWriter<Event<T>> _bus;

public async ValueTask Publish(Event<T> @event, CancellationToken token = default)
{
await _bus.WriteAsync(@event, token).ConfigureAwait(false);
}

public ValueTask DisposeAsync()
{
_bus.TryComplete();
return ValueTask.CompletedTask;
}
}

Next up we will implement consumer:

The consumer is a singleton that subscribes to certain channels he needs to listen and once he receives messages he will proxy them to registered handlers within their own scope.

We get handlers via the DI container which the user will register them in by calling our fluent DI registration extensions (I will mention that soon in the article).

We start by creating our consumer and implementing the barebone interface:

internal sealed class InMemoryEventBusConsumer<T> : IConsumer<T>
{
private readonly ChannelReader<Event<T>> _bus;
private readonly IServiceScopeFactory _scopeFactory;
private readonly ILogger<InMemoryEventBusConsumer<T>> _logger;

public async ValueTask Start(CancellationToken token = default)
{
// Implement
}

public async ValueTask Stop(CancellationToken _ = default)
{
await DisposeAsync().ConfigureAwait(false);
}

public async ValueTask DisposeAsync()
{
_stoppingToken?.Cancel();
}
}

As mentioned our Consumer is meant to be a singleton and read events from the inner channel reader that he wraps.

Thus we also supply a cancellation token source so that we are able to gracefully stop the processing.

Let’s implement the Startmethod of consumer:

EnsureStoppingTokenIsCreated();

// factory new scope so we can use it as execution context
await using var scope = _scopeFactory.CreateAsyncScope();

// retrieve scoped dependencies
var handlers = scope.ServiceProvider.GetServices<IEventHandler<T>>().ToList();
var metadataAccessor = scope.ServiceProvider.GetRequiredService<IEventContextAccessor<T>>();

if (handlers.FirstOrDefault() is null)
{
_logger.LogDebug("No handlers defined for event of {type}", typeof(T).Name);
return;
}

// run the processing
Task.Run(
async () => await StartProcessing(handlers, metadataAccessor).ConfigureAwait(false),
_stoppingToken!.Token
).ConfigureAwait(false);

Next up we will implement internal StartProcessing the method our focus will be on loading the async iterator from the channel reader and processing the read events.

For each event we will, as previously mentioned, set the provided metadata so it’s available in the scope and create a new logging scope.

Once that is done we can just kick off processing in parallel fashion.

/// <summary>
/// Subscribes to channel changes and triggers event handling
/// </summary>
internal async ValueTask StartProcessing(List<IEventHandler<T>> handlers, IEventContextAccessor<T> contextAccessor)
{
var continuousChannelIterator = _bus.ReadAllAsync(_stoppingToken!.Token)
.WithCancellation(_stoppingToken.Token)
.ConfigureAwait(false);

await foreach (var task in continuousChannelIterator)
{
if (_stoppingToken.IsCancellationRequested)
break;

// invoke handlers in parallel
await Parallel.ForEachAsync(handlers, _stoppingToken.Token,
async (handler, scopedToken) => await ExecuteHandler(handler, task, contextAccessor, scopedToken)
.ConfigureAwait(false)
).ConfigureAwait(false);
}
}


/// <summary>
/// Executes the handler in async scope
/// </summary>
internal ValueTask ExecuteHandler(IEventHandler<T> handler, Event<T> task, IEventContextAccessor<T> ctx, CancellationToken token)
{
ctx.Set(task); // set metadata and begin scope
using var logScope = _logger.BeginScope(task.Metadata ?? new EventMetadata(Guid.NewGuid().ToString()));

Task.Run(
async () => await handler.Handle(task.Data, token), token
).ConfigureAwait(false);

return ValueTask.CompletedTask;
}

Here we can improve by introducing the error handling method on the handler, retry policies around processing, and the DLQ concept.

We can achieve this by encapsulating logic within IConsumer or decorating our handlers by default.

However, this won’t be implemented in the scope of this article.

Now that we have finished the main implementations let’s quickly look at what EventMetadataAccessor implementation would look like.

This accessor implements a pattern like HttpContextin the request scope of Web API.

It leverages AsyncLocal to access data in async scope.

More on that in the following link

internal sealed class EventContextAccessor<T> : IEventContextAccessor<T>
{
private static readonly AsyncLocal<EventMetadataWrapper<T>> Holder = new();

public Event<T>? Event => Holder.Value?.Event;

public void Set(Event<T> @event)
{
var holder = Holder.Value;
if (holder != null)
{
holder.Event = null;
}

Holder.Value = new EventMetadataWrapper<T> { Event = @event };
}
}

internal sealed class EventMetadataWrapper<T>
{
public Event<T>? Event { get; set; }
}

This will allow you to inject IEventMetadataAccessor<T> in any place down the async scope and resolve event metadata in a nice and flexible way.

Registration

Next up we provide users with a nice fluent way of registering all of this:

public static IServiceCollection AddInMemoryEvent<T, THandler>(this IServiceCollection services)
where THandler : class, IEventHandler<T>
{
// Note: Expose configuration options and allow user to customize
var bus = Channel.CreateUnbounded<Event<T>>(
new UnboundedChannelOptions
{
AllowSynchronousContinuations = false,
}
);

// typed event handler
services.AddScoped<IEventHandler<T>, THandler>();

// typed event producer
services.AddSingleton(typeof(IProducer<T>), _ => new InMemoryEventBusProducer<T>(bus.Writer));

// typed event consumer
var consumerFactory = (IServiceProvider provider) => new InMemoryEventBusConsumer<T>(
bus.Reader,
provider.GetRequiredService<IServiceScopeFactory>(),
provider.GetRequiredService<ILoggerFactory>().CreateLogger<InMemoryEventBusConsumer<T>>()
);

services.AddSingleton(typeof(IConsumer), consumerFactory.Invoke);
services.AddSingleton(typeof(IConsumer<T>), consumerFactory.Invoke);

// typed event context accessor
services.TryAddSingleton(typeof(IEventContextAccessor<>), typeof(EventContextAccessor<>));

return services;
}

public static async Task<IServiceProvider> StartConsumers(this IServiceProvider services)
{
var consumers = services.GetServices<IConsumer>();
foreach (var consumer in consumers)
{
await consumer.Start().ConfigureAwait(false);
}
return services;
}

public static async Task<IServiceProvider> StopConsumers(this IServiceProvider services)
{
var consumers = services.GetServices<IConsumer>();
foreach (var consumer in consumers)
{
await consumer.Stop().ConfigureAwait(false);
}
return services;
}

Great! We have done it.

Now let’s head for the demo

We will write three basic demo handlers which we will register and run some events through the producer.

These handlers cover a few features:

  • Primitives as events
  • Complex objects as events
  • Multiple handlers for the same event
  • Resolving other dependencies within a handler
  • Consuming event context data
public sealed class OrderPlacedEventHandler : IEventHandler<OrderEvent>
{
private readonly ILogger<OrderPlacedEventHandler> _logger;

public OrderPlacedEventHandler(ILogger<OrderPlacedEventHandler> logger)
{
_logger = logger;
}

public ValueTask Handle(OrderEvent time, CancellationToken cancellationToken = default)
{
_logger.LogInformation("Order {number} has been placed", time.OrderNumber);
return ValueTask.CompletedTask;
}
}

public sealed class TrackUserOrderItemsEventHandler : IEventHandler<OrderEvent>
{
private readonly ILogger<TrackUserOrderItemsEventHandler> _logger;

public TrackUserOrderItemsEventHandler(ILogger<TrackUserOrderItemsEventHandler> logger)
{
_logger = logger;
}

public ValueTask Handle(OrderEvent? time, CancellationToken cancellationToken = default)
{
_logger.LogInformation("User {user} has ordered {items} items", time.UserId, time.ItemCount);
return ValueTask.CompletedTask;
}
}

public sealed class OrderNumberEventHandler : IEventHandler<int>
{
private readonly IEventContextAccessor<int> _ctx;
private readonly ILogger<OrderNumberEventHandler> _logger;

public OrderNumberEventHandler(IEventContextAccessor<int> ctx)
{
_ctx = ctx;
}

public ValueTask Handle(int orderNumber, CancellationToken cancellationToken = default)
{
var correlationId = _ctx.Event.Metadata!.CorrelationId;
_logger.LogInformation("Order number {number} invoked with correlation id {id}", orderNumber, correlationId);
return ValueTask.CompletedTask;
}
}

Now we run the demo in our console application:

// Register event bus services
services.AddInMemoryEvent<int, OrderNumberEventHandler>();
services.AddInMemoryEvent<OrderEvent, OrderPlacedEventHandler>();
services.AddInMemoryEvent<OrderEvent, TrackUserOrderItemsEventHandler>();

var provider = services.BuildServiceProvider();

// Retrieve producers and define demo factory function
var orderNumberProducer = provider.GetRequiredService<IProducer<int>>();
var orderProducer = provider.GetRequiredService<IProducer<OrderEvent>>();
var publishEventsFn = async (int i) =>
{
var orderNumberTask = new Event<int>(i);
var counterTask = new Event<OrderEvent>(new OrderEvent(i, i, i));

await orderNumberProducer.Publish(orderNumberTask).ConfigureAwait(false);
await orderProducer.Publish(counterTask).ConfigureAwait(false);
};

// Start listening to events
await provider.StartConsumers();

for (var i = 0; i < 3; i++)
{
await publishEventsFn.Invoke(i);
}

// allow events to process and stop consumers
await Task.Delay(TimeSpan.FromSeconds(3));
await provider.StopConsumers();
logger.LogInformation("\nConsumers stopped\n");

// publish some more events while the consumers are stopped
for (var i = 3; i < 8; i++)
{
await publishEventsFn.Invoke(i);
}

// start the consumers
logger.LogInformation("\nConsumers started\n");
await provider.StartConsumers();
await Task.Delay(TimeSpan.FromSeconds(3));

You can observe these logs as a result:

The code can be found in the following GitHub repository.

Next steps:

This is obviously a very basic implementation of a event bus. However, the interfaces we defined can give us more leeway when the time comes to go into resources like Azure service bus, Rabbit MQ, etc...

We can further extend our solution and swap it under the hood to unlock its full potential and scale in a distributed environment.

An example implementation of such functionality can be viewed in the showcase project WebShopOnContainers.

RabbitMQ implementation
Azure Service Bus implementation

Hope that gets your mind going with opportunities.

Happy coding :)!

--

--