C# .NET 8 — MediatR Pipelines
Introduction
MediatR, available as a NuGet package for .NET, embodies the mediator design pattern, a strategy aimed at decoupling communication between objects.
For a comprehensive understanding of this pattern, you can refer to the following resource:
A well-established implementation of this pattern for .NET is MediatR, whose official GitHub project can be found here:
Fundamentals of MediatR
In essence, MediatR operates across three primary modes:
- Request: Involving a single receiver with a service response.
- Notification: Engaging multiple receivers without a service response.
- StreamRequest: Utilizing a single receiver for stream operations with a service response.
For the scope of this article, our focus is primarily on the Request behavior, particularly on exploring MediatR Pipelines.
MediatR Pipelines
Within the mediator Request flow, there exists a clear distinction between a publisher ( Send Operation ) and a subscriber ( Handler ).
By leveraging MediatR pipelines, we can effectively intercept this flow and introduce customized logic into the process.
To implement a pipeline, one needs to inherit from the interface IPipelineBehavior<TRequest, TResponse>
as demonstrated below:
public sealed class MyPipelineBehavior<TRequest, TResponse>
: IPipelineBehavior<TRequest, TResponse>
{
public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken cancellationToken)
{
// Pre-processing logic
var response = await next();
// Post-processing logic
return response;
}
}
As illustrated in the provided code snippet, this approach enables the insertion of logic both before and after invoking the subsequent step in the mediator pipeline.
Furthermore, the creation of multiple pipeline behaviors, registered in sequence, facilitates the establishment of a cohesive chain of behaviors.
If you need to control the execution order of a mediator pipeline you just register the behaviors in the order you would like them to be called.
services.AddTransient(typeof(IPipelineBehavior<,>), typeof(MyPipelineBehavior1<,>));
services.AddTransient(typeof(IPipelineBehavior<,>), typeof(MyPipelineBehavior2<,>));
services.AddTransient(typeof(IPipelineBehavior<,>), typeof(MyPipelineBehavior3<,>));
services.AddTransient(typeof(IPipelineBehavior<,>), typeof(MyPipelineBehavior4<,>));
Another noteworthy technique employed in this project involves the customization of the default interface of MediatR’s IRequest
.
By inheriting the default IRequest
interface and crafting our custom Interface, such as ICommand
, we gain the ability to explicitly filter pipelines for specific interfaces.
Sample implementation of a custom IRequest
:
public interface ICommand<out TResponse> : IRequest<TResponse>
{
}
Sample instantiation of a pipeline tailored exclusively for ICommand
:
public sealed class MyPipelineBehavior<TRequest, TResponse>
: IPipelineBehavior<TRequest, TResponse>
where TRequest : ICommand<TResponse>
MediatR Pipeline Use Case
The use case of mediatR pipelines are many, and it cloud be:
- Logging: use pipeline to log relevant information about requests, responses, or execution time.
- Caching:Implementing a caching pipeline alongside requests can minimize database, in case you are using the MediatR requests to retrieve information.
- Request Validation: Integrating logic for validating input data before request execution ensures that only valid data are processed.
- Unit of work pattern: utilizing a pipeline to manage the unit of work facilitates transaction handling and rollback operations in a centralized manner.
Sample Implementation
Here’s a link to a project I’ve prepared :
Starting the application, the Swagger page will appear:
You can find 4 different endpoints:
- SampleCommand: Use this endpoint to test the command. This command passes through all the MediatR pipelines: Logging -> Validation -> Authorization.
- SampleRequest: Use this endpoint to skip all the pipelines. This works because it uses the plain
IRequest
instead of the customICommand
. - SampleEntity: This endpoint is useful to test the result of adding a Sample entity endpoint.
- AddSampleEntity: This endpoint uses
ITransactionCommand
interface and is a sample of Unit of Work pipeline behavior. In this project, it also implements a sample of the IRepository pattern using EF with an InMemory Database.
On this article we will focus only on the first 2 endpoints: SampleCommand and SampleRequest.
API Flow — Code Explaination
Below is a detailed explanation of the Command and Request endpoint flow.
Command
Behind the SampleCommand
endpoint, we have a mediator interface which sends a Sample command.
app.MapPost("/SampleCommand", ([FromBody] SampleBody sampleBody, IMediator mediator) =>
{
return mediator.Send(new SampleCommand() { Id = Guid.NewGuid(), Description = sampleBody.Description, EventTime = DateTime.UtcNow });
})
.WithName("SampleCommand")
.WithOpenApi();
Here is the definition of the SampleCommand
class, along with its return class:
public class SampleCommand : ICommand<SampleCommandComplete>
{
public Guid Id { get; set; }
public DateTime EventTime { get; set; }
public string Description { get; set; }
}
public record SampleCommandComplete
{
public Guid Id { get; set; }
}
As mentioned before, the ICommand
interface inherits the standard IRequest
interface of MediatR.
public interface ICommand<out TResponse> : IRequest<TResponse>
{
}
Here’s the handler implementation; in this case, it simply logs some info and creates the response:
public class SampleCommandHandler : IRequestHandler<SampleCommand, SampleCommandComplete>
{
private readonly ILogger<SampleCommandHandler> _logger;
public SampleCommandHandler(ILogger<SampleCommandHandler> logger)
{
_logger = logger;
}
public async Task<SampleCommandComplete> Handle(SampleCommand request, CancellationToken cancellationToken)
{
_logger.LogInformation("Command Executed Id:{Id};Description:{Description};EventTime:{EventTime}", request.Id, request.Description, request.EventTime);
return new SampleCommandComplete() { Id = request.Id };
}
}
Request
The request flow is more or less the same as the command flow. Behind the API, it simply creates a SampleRequest
and sends it using the IMediator interface.
app.MapPost("/SampleRequest", ([FromBody] SampleBody sampleBody, IMediator mediator) =>
{
return mediator.Send(new SampleRequest() { Id = Guid.NewGuid(), Description = sampleBody.Description, EventTime = DateTime.UtcNow });
})
.WithName("SampleRequest")
.WithOpenApi();
In this case, the SampleRequest
just inherits directly from the standard IRequest
.
public class SampleRequest : IRequest<SampleRequestComplete>
{
public Guid Id { get; set; }
public DateTime EventTime { get; set; }
public string Description { get; set; }
}
public record SampleRequestComplete
{
public Guid Id { get; set; }
}
Here’s the handler implementation; in this case, it just logs some info and creates the response:
public class SampleRequestHandler : IRequestHandler<SampleRequest, SampleRequestComplete>
{
private readonly ILogger<SampleRequestHandler> _logger;
public SampleRequestHandler(ILogger<SampleRequestHandler> logger)
{
_logger = logger;
}
public async Task<SampleRequestComplete> Handle(SampleRequest request, CancellationToken cancellationToken)
{
_logger.LogInformation("Request Executed Id:{Id};Description:{Description};EventTime:{EventTime}", request.Id, request.Description, request.EventTime);
return new SampleRequestComplete() { Id = request.Id };
}
}
Pipeline — Code Explaination
Now let’s delve into a practical use case for MediatR Pipelines.
In this project, all the pipelines refer to the ICommand
interface. So when you try the two endpoints explained before, you can see that just the SampleCommand
is interested in the pipeline flow.
Logging Behavior
The first use case is to log information before and after the execution of a request. In this use case, the pipeline logs the execution time of the request.
public sealed class LoggingBehavior<TRequest, TResponse>
: IPipelineBehavior<TRequest, TResponse>
where TRequest : ICommand<TResponse>
{
private readonly ILogger<LoggingBehavior<TRequest, TResponse>> _logger;
public LoggingBehavior(ILogger<LoggingBehavior<TRequest, TResponse>> logger)
{
_logger = logger;
}
public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken cancellationToken)
{
Stopwatch stopwatch = new();
_logger.LogInformation($"Handling {typeof(TRequest).Name}");
stopwatch.Start();
var response = await next();
stopwatch.Stop();
_logger.LogInformation($"Handled {typeof(TResponse).Name} in {stopwatch.ElapsedMilliseconds} ms");
stopwatch.Reset();
return response;
}
}
Validation Behavior
Another use case is to validate the input message request before executing the request.
public sealed class ValidationBehavior<TRequest, TResponse>
: IPipelineBehavior<TRequest, TResponse>
where TRequest : ICommand<TResponse>
{
private readonly IEnumerable<IValidator<TRequest>> _validators;
public ValidationBehavior(IEnumerable<IValidator<TRequest>> validators)
{
_validators = validators;
}
public async Task<TResponse> Handle(
TRequest request,
RequestHandlerDelegate<TResponse> next,
CancellationToken cancellationToken)
{
var context = new ValidationContext<TRequest>(request);
var validationFailures = await Task.WhenAll(
_validators.Select(validator => validator.ValidateAsync(context)));
var errors = _validators
.Select(x => x.Validate(context))
.SelectMany(x => x.Errors)
.Where(x => x != null);
if (errors.Any())
{
throw new ValidationException(errors);
}
var response = await next();
return response;
}
}
This validation pipeline is generic; to customize it for each message, I use a library called FluentValidation.
You can find more information about it here:
So, to define a validation, simply create a class that inherits from AbstractValidator
as shown in the following code
public sealed class SampleCommandValidator : AbstractValidator<SampleCommand>
{
public SampleCommandValidator()
{
RuleFor(command => command.Id)
.NotEmpty()
.NotEqual(Guid.Empty);
RuleFor(command => command.Description)
.NotEmpty();
}
}
Custom Authorization Behavior
Similar to the validation pipeline, we can implement a custom authorization pipeline.
To do it, we inject a custom IAuthService
and use it in the pre-processing section.
public class CommandAuthorizationBehavior<TRequest, TResponse>
: IPipelineBehavior<TRequest, TResponse>
where TRequest : ICommand<TResponse>
{
private readonly IAuthService _authService;
public CommandAuthorizationBehavior(IAuthService authService)
{
_authService = authService;
}
public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken cancellationToken)
{
var response = _authService.OperationAllowed();
if (!response.IsSuccess)
throw response.Exception ?? new Exception();
return await next();
}
}
If you try the service, it might return an error because the return of the method is random. This randomness is done using Bogus, a useful .NET NuGet package to create fake data.
You can find more information about it here:
Registration
In the domain project, you will find an extension method that registers all the dependencies.
public static IServiceCollection AddMediatorSample(this IServiceCollection services)
{
services.AddMediatR(cfg => cfg.RegisterServicesFromAssembly(typeof(ServicesExtensions).Assembly));
//Just register the behaviors in the order you would like them to be called.
services.AddTransient(typeof(IPipelineBehavior<,>), typeof(LoggingBehavior<,>));
services.AddTransient(typeof(IPipelineBehavior<,>), typeof(ValidationBehavior<,>));
services.AddTransient(typeof(IPipelineBehavior<,>), typeof(CommandAuthorizationBehavior<,>));
services.AddValidatorsFromAssembly(typeof(ServicesExtensions).Assembly);
services.AddTransient<IAuthService, AuthService>();
return services;
}
Other Article about MediatR
For further insights into MediatR, feel free to explore additional articles I have authored on this subject.
By sharing these insights, I hope to create a valuable resource that will help to become more proficient in .NET development.
If you enjoyed the content or found it useful, please give a clap to show your support. Your feedback and suggestions will be greatly appreciated as they will help shape the content and ensure it meets the needs of the community.
Thank you for reading, and happy coding!