Meeting the Channels in .NET

Hanifi Yildirimdagi
8 min readMay 6, 2023

--

Photo by Modestas Urbonas on Unsplash

Once upon a time, I encountered an unexpected bottleneck in a project I was working on. Four different events were performing I/O operations asynchronously on a file, but the events working on the same file were causing an “Already In Use” exception. Although these events had worked before, there hadn’t been any issues due to the time lag between them.

As a solution, I centralized the I/O operations to manage the threads. However, this solution increased the response time of the events, and the average response time, which was previously 2 seconds, increased to 4 seconds or more. This performance loss bothered me greatly, as solving this problem had affected other important parts of the project.

I remembered the Channels structure that I had heard of before but had not had the opportunity to use, and it occurred to me that it could solve the bottleneck in the project. Firstly, I created a channel and registered it as a Singleton for Dependency Injection. This would allow processing the models coming through the channel much faster than running them in the background. Since I had centralized the I/O operation before, I decided to listen to and process the models coming through the channel by creating a Hosted Service this time. This way, none of the events would have to wait for the I/O operation to complete, and all of these operations would be processed one by one in a background queue. In the last performance tests, I saw that the response time had decreased to an average of 80 milliseconds. This was an incredible success, and my concerns about performance loss were completely eliminated.

Now, I would like to share a similar example to the problem and solution discussed in this article, as well as what needs to be known about the System.Threading.Channels structure. Enjoy reading!

What Is It?

“Channels” is one of the important structures that .NET offers us. This structure enables us to asynchronously transfer data from a producer to a consumer through the Consumer/Producer concept. I’m sure when we talk about Consumer and Producer, messaging software such as RabbitMQ, Kafka, etc. comes to mind. Channels provide us with the basic functions that these structures offer, and it does so within the System.Threading.Channels namespace in .NET. This is fantastic, especially since it is automatically included in all versions of “.NET Core 2.1” and above. This is particularly very useful since these types of problems can arise in many legacy projects.

The simplest way on the queue of a message from producer to consumer

Getting Started

Channels can be defined in two different ways; Unbounded and Bounded.

Unbounded Channels
This type of channel can be used by multiple producers and consumers simultaneously. Unbounded Channels can hold an unlimited number of data items (within software and hardware limits, of course). You can define an unbounded channel like this:

var channel = Channel.CreateUnbounded<T>();

Bounded Channels
Bounded channels This type of channel holds a maximum number of items specified during its creation. To set the capacity, a limit must be specified. If a producer reaches the maximum capacity of the channel, we can manage this situation by using one of the BoundedChannelFullMode types. These types are:

- BoundedChannelFullMode.Wait: In this mode, when a producer writes to a channel and the channel is full, the data is held until space becomes available.
- BoundedChannelFullMode.DropOldest: In this mode, when a producer writes to a channel and the channel is full, the oldest item (the first item in the channel) is dropped and replaced with the new item.
- BoundedChannelFullMode.DropNewest: In this mode, when a producer writes to a channel and the channel is full, the new item is dropped and not sent to the consumer.
- BoundedChannelFullMode.DropWrite: In this mode, when a producer writes to a channel and the channel is full, the new item is dropped and not sent to the consumer.

We can define a bounded channel as follows:

var channel = Channel.CreateBounded<T>(200, options => 
{
options.FullMode = BoundedChannelFullMode.DropOldest;
}
);

You can check out Microsoft documentation for more information: https://learn.microsoft.com/en-us/dotnet/core/extensions/channels

Scenario

Let’s consider an example scenario similar to the problem I experienced, in order to illustrate it in the article. We want to log the operations performed by users to a JSON file along with the parameters used. We have two endpoints:

  • SendNotification: An endpoint that sends a notification to a user.
  • ScheduleNotification: An endpoint that schedules a notification to be sent to a user.
[ApiController]  
[Route("[controller]")]
public class ApiController : ControllerBase
{
[HttpPost("SendNotification")]
public async Task<IActionResult> SendNotification([FromBody] SendNotificationRequest request)
{
return Ok();
}

[HttpPost("ScheduleNotification")]
public async Task<IActionResult> ScheduleNotification([FromBody] ScheduleNotificationRequest request)
{
return Ok();
}
}

Endpoints request the following models in the request body.

public class SendNotificationRequest  
{
public string Email { get; set; }
public string Message { get; set; }
}

public class ScheduleNotificationRequest : SendNotificationRequest
{
public string DueDate { get; set; }
}

Now let’s create an imaginary sender. In order to focus more on the topic, we will not do the actual notification sent here. The implementation will only wait 500 ms.

public interface INotificator  
{
Task Send(SendNotificationRequest request);
Task Schedule(ScheduleNotificationRequest request);
}

After implementation, our Controller Class is as follows.

[ApiController]  
[Route("[controller]")]
public class ApiController : ControllerBase
{
private readonly INotificator _notificator;

public ApiController(INotificator notificator)
{
_notificator = notificator;
}

public async Task<IActionResult> SendNotification([FromBody] SendNotificationRequest request)
{
await _notificator.Send(request);
return Ok();
}

public async Task<IActionResult> ScheduleNotification([FromBody] ScheduleNotificationRequest request)
{
await _notificator.Schedule(request);
return Ok();
}
}

Everything is done so far. The next step is to log the incoming requests to a JSON file after submission.

public interface IJournalRegistrar  
{
Task Register(JournalItem model);
}

public class JournalRegistrar : IJournalRegistrar
{
private readonly List<JournalItem> _journal;
private readonly List<JournalItem> _pendingItems = new();
private string AbsolutePath => Directory.GetCurrentDirectory() + "/journal.json";

public JournalRegistrar()
{
var content = File.ReadAllText(AbsolutePath);
_journal = Newtonsoft.Json.JsonConvert.DeserializeObject<List<JournalItem>>(content) ?? new();
}
public async Task Register(JournalItem model)
{
_pendingItems.Add(model);
await Write();
}
private async Task Write()
{
if (_pendingItems.Count <= 5) return;
_journal.AddRange(_pendingItems);
var content = Newtonsoft.Json.JsonConvert.SerializeObject(_journal);
await File.WriteAllTextAsync(AbsolutePath,content);
_pendingItems.Clear();
}
}

The code block above shows an implementation of the IJournalRegistrar interface which is responsible for registering JournalItem models. The JournalRegistrar class implements this interface and maintains a list of journal items in memory as well as a list of pending items waiting to be written to the disk.

The constructor of JournalRegistrar reads the list of journal items from a JSON file stored in the current directory. When Register is called, the new journal item is added to the list of pending items and if there are more than 5 pending items, they are written to the disk.

The Write method appends the pending items to the existing list of journal items and serializes the entire list to JSON format. Then it writes the serialized data to the same file that was read in the constructor. Finally, the pending items are cleared.

Overall, this implementation allows for efficient disk writes by batching multiple journal items together and writing them at once.

builder.Services.AddSingleton<IJournalRegistrar, JournalRegistrar>();

The reason why IJournalRegistrar and JournalRegistrar should be registered as singleton instances in Dependency Injection (DI) is related to their functionality as a journal registrar. The JournalRegistrar class is designed to store and manage journal entries, which are typically time-stamped records of important events or transactions within a system. When using DI to inject instances of JournalRegistrar, it’s important that the same instance be shared across the entire application.

This is because any modifications made to the journal by one part of the application must be reflected throughout the rest of the application. If multiple instances of JournalRegistrar were used, there would be multiple copies of the journal in memory, which could lead to inconsistencies and synchronization issues. By registering IJournalRegistrar and JournalRegistrar as singletons in DI, we ensure that the same instance is shared across the application, and that modifications made to the journal are immediately visible to all other parts of the system.

ApiController]  
[Route("[controller]")]
public class ApiController : ControllerBase
{
private readonly INotificator _notificator;
private readonly IJournalRegistrar _journal;

public ApiController(INotificator notificator, IJournalRegistrar journal)
{ _notificator = notificator;
_journal = journal;
}

[HttpPost("SendNotification")]
public async Task<IActionResult> SendNotification([FromBody] SendNotificationRequest request)
{
await _notificator.Send(request);
await _journal.Register(new JournalItem(request, "SendNotification"));
return Ok();
}

[HttpPost("ScheduleNotification")]
public async Task<IActionResult> ScheduleNotification([FromBody] ScheduleNotificationRequest request)
{
await _notificator.Schedule(request);
await _journal.Register(new JournalItem(request, "SendNotification"));
return Ok();
}
}

After implementation, our Controller class will look like the following. When requests arrive and the threshold (5) is reached, our JSON file will start to fill with data.

The file contents

However, we have come to this point and you may say that we have not seen a Channel usage yet. Channels are next.

As seen in the code, after each end-point operation, an event is recorded through the JournalRegistrar. We have two critical operations: one is to register the event and the other is to log it. The Register method has a fast response time for the first four operations, but on the fifth operation, it becomes significantly slower. This can cause complaints about the system’s performance, and if the number of requests increases, the complaint will become more prominent.

Moreover, it might seem that we will not encounter any issues if there are already five pending objects waiting to be written to the file. However, if the number of calls increases, for example, if ten requests arrive at time T, the threshold value will be exceeded continuously, and this will result in parallel file writing operations at certain points.

Let’s create a queue interface named `IJournalQueue` that will serve as a queue for processing Journal items.

public interface IJournalQueue  
{
Task Produce(JournalItem message);
ValueTask<JournalItem> Consume(CancellationToken cancellationToken);
}

We have created two methods; the Produce method where messages will be pushed, and the Consume method where messages will be read.

public class JournalQueue : IJournalQueue  
{
private readonly Channel<JournalItem> _channel;

public JournalQueue()
{
_channel = Channel.CreateBounded<JournalItem>(new BoundedChannelOptions(100)
{
FullMode = BoundedChannelFullMode.Wait,
SingleReader = true
});
}
public async Task Produce(JournalItem message)
{
await _channel.Writer.WriteAsync(message);
}
public async ValueTask<JournalItem> Consume(CancellationToken cancellationToken)
{
return await _channel.Reader.ReadAsync(cancellationToken);
}
}

The implementation uses an asynchronous communication channel called `Channel<T>`. The channel stores and transfers messages of type `JournalItem`. The channel is created with a limited capacity using the `CreateBounded` method, which sets the maximum number of elements held in the channel. When the channel reaches its maximum capacity, it waits in `Wait` mode and deletes messages to make room for new ones to be written.

However, messages are not yet being accessed and processed from any location. Creating a BackgroundService to handle these operations in the background is the best option.

public sealed class JournalConsumer : BackgroundService  
{
private readonly IJournalQueue _journalQueue;
private readonly IJournalRegistrar _registrar;

public JournalConsumer(IJournalQueue journalQueue, IJournalRegistrar registrar)
{
_journalQueue = journalQueue;
_registrar = registrar;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var message = await _journalQueue.Consume(stoppingToken);
await _registrar.Register(message);
}
}
}

The above background service will receive and process messages via the Consume method from JournalQueue, as long as the background service is not stopped.

builder.Services.AddSingleton<IJournalQueue, JournalQueue>();  
builder.Services.AddHostedService<JournalConsumer>();

Finally, adding the queue services to DI will be sufficient. By replacing IJournalRegistrar with IJournalQueue in the endpoints, the operation can now be sent to the background service for processing without the need to wait for a response. This help to make the application faster and more efficient.

--

--