C# .NET 8 — MediatR Pipelines

Gabriele Tronchin
8 min readMay 5, 2024

--

Introduction

MediatR, available as a NuGet package for .NET, embodies the mediator design pattern, a strategy aimed at decoupling communication between objects.

For a comprehensive understanding of this pattern, you can refer to the following resource:

A well-established implementation of this pattern for .NET is MediatR, whose official GitHub project can be found here:

Fundamentals of MediatR

In essence, MediatR operates across three primary modes:

  • Request: Involving a single receiver with a service response.
  • Notification: Engaging multiple receivers without a service response.
  • StreamRequest: Utilizing a single receiver for stream operations with a service response.

For the scope of this article, our focus is primarily on the Request behavior, particularly on exploring MediatR Pipelines.

MediatR Pipelines

Within the mediator Request flow, there exists a clear distinction between a publisher ( Send Operation ) and a subscriber ( Handler ).

By leveraging MediatR pipelines, we can effectively intercept this flow and introduce customized logic into the process.

To implement a pipeline, one needs to inherit from the interface IPipelineBehavior<TRequest, TResponse> as demonstrated below:

public sealed class MyPipelineBehavior<TRequest, TResponse>
: IPipelineBehavior<TRequest, TResponse>
{

public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken cancellationToken)
{
// Pre-processing logic

var response = await next();

// Post-processing logic

return response;
}
}

As illustrated in the provided code snippet, this approach enables the insertion of logic both before and after invoking the subsequent step in the mediator pipeline.

Furthermore, the creation of multiple pipeline behaviors, registered in sequence, facilitates the establishment of a cohesive chain of behaviors.

If you need to control the execution order of a mediator pipeline you just register the behaviors in the order you would like them to be called.


services.AddTransient(typeof(IPipelineBehavior<,>), typeof(MyPipelineBehavior1<,>));
services.AddTransient(typeof(IPipelineBehavior<,>), typeof(MyPipelineBehavior2<,>));
services.AddTransient(typeof(IPipelineBehavior<,>), typeof(MyPipelineBehavior3<,>));
services.AddTransient(typeof(IPipelineBehavior<,>), typeof(MyPipelineBehavior4<,>));

Another noteworthy technique employed in this project involves the customization of the default interface of MediatR’s IRequest.

By inheriting the default IRequest interface and crafting our custom Interface, such as ICommand, we gain the ability to explicitly filter pipelines for specific interfaces.

Sample implementation of a custom IRequest:

public interface ICommand<out TResponse> : IRequest<TResponse>
{
}

Sample instantiation of a pipeline tailored exclusively for ICommand:

public sealed class MyPipelineBehavior<TRequest, TResponse>
: IPipelineBehavior<TRequest, TResponse>
where TRequest : ICommand<TResponse>

MediatR Pipeline Use Case

The use case of mediatR pipelines are many, and it cloud be:

  • Logging: use pipeline to log relevant information about requests, responses, or execution time.
  • Caching:Implementing a caching pipeline alongside requests can minimize database, in case you are using the MediatR requests to retrieve information.
  • Request Validation: Integrating logic for validating input data before request execution ensures that only valid data are processed.
  • Unit of work pattern: utilizing a pipeline to manage the unit of work facilitates transaction handling and rollback operations in a centralized manner.

Sample Implementation

Here’s a link to a project I’ve prepared :

Starting the application, the Swagger page will appear:

You can find 4 different endpoints:

  • SampleCommand: Use this endpoint to test the command. This command passes through all the MediatR pipelines: Logging -> Validation -> Authorization.
  • SampleRequest: Use this endpoint to skip all the pipelines. This works because it uses the plain IRequest instead of the custom ICommand.
  • SampleEntity: This endpoint is useful to test the result of adding a Sample entity endpoint.
  • AddSampleEntity: This endpoint uses ITransactionCommand interface and is a sample of Unit of Work pipeline behavior. In this project, it also implements a sample of the IRepository pattern using EF with an InMemory Database.

On this article we will focus only on the first 2 endpoints: SampleCommand and SampleRequest.

API Flow — Code Explaination

Below is a detailed explanation of the Command and Request endpoint flow.

Command

Behind the SampleCommand endpoint, we have a mediator interface which sends a Sample command.

app.MapPost("/SampleCommand", ([FromBody] SampleBody sampleBody, IMediator mediator) =>
{
return mediator.Send(new SampleCommand() { Id = Guid.NewGuid(), Description = sampleBody.Description, EventTime = DateTime.UtcNow });
})
.WithName("SampleCommand")
.WithOpenApi();

Here is the definition of the SampleCommand class, along with its return class:

public class SampleCommand : ICommand<SampleCommandComplete>
{
public Guid Id { get; set; }

public DateTime EventTime { get; set; }

public string Description { get; set; }

}

public record SampleCommandComplete
{
public Guid Id { get; set; }
}

As mentioned before, the ICommand interface inherits the standard IRequest interface of MediatR.

public interface ICommand<out TResponse> : IRequest<TResponse>
{
}

Here’s the handler implementation; in this case, it simply logs some info and creates the response:

public class SampleCommandHandler : IRequestHandler<SampleCommand, SampleCommandComplete>
{
private readonly ILogger<SampleCommandHandler> _logger;


public SampleCommandHandler(ILogger<SampleCommandHandler> logger)

{
_logger = logger;
}

public async Task<SampleCommandComplete> Handle(SampleCommand request, CancellationToken cancellationToken)
{
_logger.LogInformation("Command Executed Id:{Id};Description:{Description};EventTime:{EventTime}", request.Id, request.Description, request.EventTime);
return new SampleCommandComplete() { Id = request.Id };
}
}

Request

The request flow is more or less the same as the command flow. Behind the API, it simply creates a SampleRequest and sends it using the IMediator interface.

app.MapPost("/SampleRequest", ([FromBody] SampleBody sampleBody, IMediator mediator) =>
{
return mediator.Send(new SampleRequest() { Id = Guid.NewGuid(), Description = sampleBody.Description, EventTime = DateTime.UtcNow });
})
.WithName("SampleRequest")
.WithOpenApi();

In this case, the SampleRequest just inherits directly from the standard IRequest.

public class SampleRequest : IRequest<SampleRequestComplete>
{
public Guid Id { get; set; }

public DateTime EventTime { get; set; }

public string Description { get; set; }

}

public record SampleRequestComplete
{
public Guid Id { get; set; }
}

Here’s the handler implementation; in this case, it just logs some info and creates the response:

public class SampleRequestHandler : IRequestHandler<SampleRequest, SampleRequestComplete>
{
private readonly ILogger<SampleRequestHandler> _logger;


public SampleRequestHandler(ILogger<SampleRequestHandler> logger)

{
_logger = logger;
}

public async Task<SampleRequestComplete> Handle(SampleRequest request, CancellationToken cancellationToken)
{
_logger.LogInformation("Request Executed Id:{Id};Description:{Description};EventTime:{EventTime}", request.Id, request.Description, request.EventTime);
return new SampleRequestComplete() { Id = request.Id };
}
}

Pipeline — Code Explaination

Now let’s delve into a practical use case for MediatR Pipelines.

In this project, all the pipelines refer to the ICommand interface. So when you try the two endpoints explained before, you can see that just the SampleCommandis interested in the pipeline flow.

Logging Behavior

The first use case is to log information before and after the execution of a request. In this use case, the pipeline logs the execution time of the request.

public sealed class LoggingBehavior<TRequest, TResponse>
: IPipelineBehavior<TRequest, TResponse>
where TRequest : ICommand<TResponse>
{
private readonly ILogger<LoggingBehavior<TRequest, TResponse>> _logger;

public LoggingBehavior(ILogger<LoggingBehavior<TRequest, TResponse>> logger)
{
_logger = logger;
}


public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken cancellationToken)
{
Stopwatch stopwatch = new();
_logger.LogInformation($"Handling {typeof(TRequest).Name}");
stopwatch.Start();

var response = await next();

stopwatch.Stop();

_logger.LogInformation($"Handled {typeof(TResponse).Name} in {stopwatch.ElapsedMilliseconds} ms");
stopwatch.Reset();

return response;
}
}

Validation Behavior

Another use case is to validate the input message request before executing the request.

public sealed class ValidationBehavior<TRequest, TResponse>
: IPipelineBehavior<TRequest, TResponse>
where TRequest : ICommand<TResponse>
{
private readonly IEnumerable<IValidator<TRequest>> _validators;

public ValidationBehavior(IEnumerable<IValidator<TRequest>> validators)
{
_validators = validators;
}

public async Task<TResponse> Handle(
TRequest request,
RequestHandlerDelegate<TResponse> next,
CancellationToken cancellationToken)
{
var context = new ValidationContext<TRequest>(request);

var validationFailures = await Task.WhenAll(
_validators.Select(validator => validator.ValidateAsync(context)));

var errors = _validators
.Select(x => x.Validate(context))
.SelectMany(x => x.Errors)
.Where(x => x != null);

if (errors.Any())
{
throw new ValidationException(errors);
}

var response = await next();

return response;
}
}

This validation pipeline is generic; to customize it for each message, I use a library called FluentValidation.

You can find more information about it here:

So, to define a validation, simply create a class that inherits from AbstractValidator as shown in the following code

public sealed class SampleCommandValidator : AbstractValidator<SampleCommand>
{
public SampleCommandValidator()
{
RuleFor(command => command.Id)
.NotEmpty()
.NotEqual(Guid.Empty);

RuleFor(command => command.Description)
.NotEmpty();
}

}

Custom Authorization Behavior

Similar to the validation pipeline, we can implement a custom authorization pipeline.

To do it, we inject a custom IAuthService and use it in the pre-processing section.

public class CommandAuthorizationBehavior<TRequest, TResponse>
: IPipelineBehavior<TRequest, TResponse>
where TRequest : ICommand<TResponse>
{
private readonly IAuthService _authService;

public CommandAuthorizationBehavior(IAuthService authService)
{
_authService = authService;
}

public async Task<TResponse> Handle(TRequest request, RequestHandlerDelegate<TResponse> next, CancellationToken cancellationToken)
{
var response = _authService.OperationAllowed();

if (!response.IsSuccess)
throw response.Exception ?? new Exception();

return await next();
}
}

If you try the service, it might return an error because the return of the method is random. This randomness is done using Bogus, a useful .NET NuGet package to create fake data.

You can find more information about it here:

Registration

In the domain project, you will find an extension method that registers all the dependencies.

public static IServiceCollection AddMediatorSample(this IServiceCollection services)
{
services.AddMediatR(cfg => cfg.RegisterServicesFromAssembly(typeof(ServicesExtensions).Assembly));

//Just register the behaviors in the order you would like them to be called.
services.AddTransient(typeof(IPipelineBehavior<,>), typeof(LoggingBehavior<,>));
services.AddTransient(typeof(IPipelineBehavior<,>), typeof(ValidationBehavior<,>));
services.AddTransient(typeof(IPipelineBehavior<,>), typeof(CommandAuthorizationBehavior<,>));

services.AddValidatorsFromAssembly(typeof(ServicesExtensions).Assembly);

services.AddTransient<IAuthService, AuthService>();

return services;

}

Other Article about MediatR

For further insights into MediatR, feel free to explore additional articles I have authored on this subject.

By sharing these insights, I hope to create a valuable resource that will help to become more proficient in .NET development.

If you enjoyed the content or found it useful, please give a clap to show your support. Your feedback and suggestions will be greatly appreciated as they will help shape the content and ensure it meets the needs of the community.

Thank you for reading, and happy coding!

--

--

Gabriele Tronchin

I'm a software engineer, specializing in backend development and distributed systems.