Leveraging AWS S3 for EventSourcing.Backbone: Persistent and Resilient Data Storage

Bnaya Eshet
8 min readJun 12, 2023

--

Welcome back to my technical blog! In the previous post of this series, we introduced EventSourcing.Backbone and discussed its benefits. In this post, we’ll explore how you can replace its key-value database with AWS S3, taking advantage of its persistent and resilient storage capabilities.

It is recommended to review the previous post before reading this one. You can check out the entire series here.

Understanding the Role of Key-Value Databases

Key-value databases have played a vital role in storing auxiliary data associated with events. By separating the concerns of the stream sequence (along with metadata) and event data, they contribute to lean and efficient streams. This separation proves to be highly advantageous in various scenarios, including those involving GDPR compliance, which we’ll explore in upcoming posts.

Introducing AWS S3 for Data Storage

AWS S3 (Simple Storage Service) provides a scalable, durable, and highly available object storage solution. By leveraging S3 as a storage backend for EventSourcing.Backbone, you can eliminate the need for a separate key-value database and take advantage of the benefits offered by AWS S3.

Benefits of AWS S3 for EventSourcing.Backbone

1. Persistent and Resilient Storage

AWS S3 is designed to provide persistent and durable storage for your data. It replicates objects across multiple availability zones, ensuring high availability and data durability. By using S3 for storing event data in EventSourcing.Backbone, you significantly reduce the chances of data loss and improve the overall reliability of your application.

2. Scalability and Performance

AWS S3 and Redis Hash have different strengths when it comes to scalability and performance. While Redis Hash might offer superior throughput for certain workloads, AWS S3 excels in scalability and resilience.

AWS S3 is built to handle massive amounts of data and support high request rates. It automatically scales to accommodate your storage needs, allowing you to seamlessly handle growing event volumes in EventSourcing.Backbone. With its global infrastructure, S3 provides low-latency access to your event data from anywhere in the world, ensuring optimal performance for your applications.

On the other hand, Redis Hash can provide higher throughput for read and write operations in certain scenarios. However, it might not scale as seamlessly as AWS S3 and its resilience can be limited to the capacity of a single Redis instance.

By leveraging the strengths of both AWS S3 and Redis Hash, you can achieve a balanced approach. You can use AWS S3 as the primary storage backend for its scalability and resilience while employing Redis Hash as a caching layer to improve read and write performance when needed.

In the upcoming blog post, we’ll explore the combination of AWS S3 and Redis Hash in EventSourcing.Backbone, diving deeper into how this hybrid approach can optimize scalability, performance, and resilience in your event sourcing architecture.

3. Cost-Effective Solution

AWS S3 offers a cost-effective storage solution. You only pay for the storage capacity you consume, without any upfront costs or long-term commitments. This makes it an attractive option for businesses of all sizes, allowing you to optimize your storage costs while ensuring the reliability and durability of your event data.

Code sample

The code sample for this post is available on GitHub (S3 branch).

Prerequisite:

You should create an S3 Bucket and configure the following permissions.
- s3:PutObject
- s3:GetObject

Learn more about defining S3 permission & .NET credentials in this post.

Taken by Asaf Cohen

Let’s Code

We’ll continue from where we had stopped in the previous post.

You can use the HelloWorld branch as starting point
You can clone

NuGet packages

Add the following NuGet

Producer:
- EventSourcing.Backbone.Channels.RedisProducerProvider
- EventSourcing.Backbone.Channels.S3StoreProducerProvider

Consumer:
- EventSourcing.Backbone.Channels.RedisConsumerProvider
- EventSourcing.Backbone.Channels.S3StoreConsumerProvider

Authorization

You can set authorization and credentials in a few different patterns, learn how to set it up in this article.

Code Changes

The code is almost identical to the code in the previous post with the additional setting for the S3 storage provider (a single line in the builder).

Assuming you’re using Environment Variable for storing access point & secret along with default bucket (by default it will look for the following environment variable entries: `S3_EVENT_SOURCE_REGION`, `S3_EVENT_SOURCE_ACCESS_KEY` and `S3_EVENT_SOURCE_SECRET`).

  • Producer
IShipmentTrackingProducer producer = RedisProducerBuilder.Create()
.AddS3Storage()
.Uri("hello.event-sourcing")
.BuildShipmentTrackingProducer();

This stream will store the metadata in Redis (under the key of `hello.event-sourcing`“”) and the message payload will goes to S3 under the `hello-s3-event-bucket` bucket (make sure to create the bucket infront).

  • Consumer
IConsumerLifetime subscription = RedisConsumerBuilder.Create()
.AddS3Storage()
.Uri("hello.event-sourcing")
.SubscribeShipmentTrackingConsumer(Subscription.Instance);

If you want to be more explicit about the bucket you can define it in the option:

var options = new S3Options { Bucket = "hello-s3-event-bucket" };
// producer
IShipmentTrackingProducer producer = RedisProducerBuilder.Create()
.AddS3Storage(options)
.Uri("hello.event-sourcing")
.BuildShipmentTrackingProducer();
// consumer
IConsumerLifetime subscription = RedisConsumerBuilder.Create()
.AddS3Storage()
.Uri("hello.event-sourcing")
.SubscribeShipmentTrackingConsumer(Subscription.Instance);

And you can even pass an IAmazonS3 (which is useful when you define credentials via a profile)

Let’s use an ASP.NET project which will produce events via Web API and consume events via Job.

The long way is to create an ASP.NET web API project and do the following modifications.

  1. Add NuGets:
    - EventSourcing.Backbone.SrcGen
    - EventSourcing.Backbone.Channels.RedisProducerProvider
    - EventSourcing.Backbone.Channels.S3StoreProducerProvider
    - EventSourcing.Backbone.Channels.RedisConsumerProvider

    - EventSourcing.Backbone.Channels.S3StoreConsumerProvider
    - AWSSDK.Extensions.NETCore.Setup
    - AWSSDK.S3
  2. Set AWS S3 credentials (using the profile approach)
    appsettings.json might look somewhat like this:
{
"Logging": {
"LogLevel": {
"Default": "Information",
"Microsoft.AspNetCore": "Warning"
}
},
"AllowedHosts": "*",
"AWS": {
"Region": "us-east-1",
"Profile": "playground"
}
}

3. Define an interface for the events schema + Entities class if needed:

[EventsContract(EventsContractType.Producer)]
[EventsContract(EventsContractType.Consumer)]
[Obsolete("Either use the Producer or Consumer version of this interface", true)]
public interface IShipmentTracking
{
// the following method will be ValueTask<EventKeys> OrderPlacedAsync(...) at the producer interface and ValueTask OrderPlacedAsync(...) at the consumer
void OrderPlaced(User user, Product product, DateTimeOffset time);
ValueTask PackingAsync(string email, int productId, DateTimeOffset time);
ValueTask OnDeliveryAsync(string email, int productId, DateTimeOffset time);
ValueTask OnReceivedAsync(string email, int productId, DateTimeOffset time);
}

public record User(int id, string email, string name);

public record Product(int id, string name, double price);

4. Define a producer controller:

[ApiController]
[Route("[controller]")]
public class ProducerController : ControllerBase
{
private readonly ILogger<ProducerController> _logger;
private readonly IShipmentTrackingProducer _producer;

public ProducerController(
ILogger<ProducerController> logger,
IShipmentTrackingProducer producer)
{
_logger = logger;
_producer = producer;
}

[HttpPost("order-placed")]
[ProducesResponseType(StatusCodes.Status201Created)]
public async Task<string> PostOrderPlacedAsync([FromBody]OrderPayload payload)
{
var (user, product) = payload;
_logger.LogDebug("Sending order-placed event");
EventKey id = await _producer.OrderPlacedAsync(user, product, DateTimeOffset.Now);
return id;
}

[HttpPost("packing")]
[ProducesResponseType(StatusCodes.Status201Created)]
public async Task<string> PostPackingAsync([FromBody]StatePayload payload)
{
var (email, productId) = payload;
_logger.LogDebug("Sending packing event");
EventKey id = await _producer.PackingAsync(email, productId, DateTimeOffset.Now);
return id;
}

[HttpPost("on-delivery")]
[ProducesResponseType(StatusCodes.Status201Created)]
public async Task<string> PostOnDeliveryAsync([FromBody]StatePayload payload)
{
var (email, productId) = payload;
_logger.LogDebug("Sending on-delivery event");
EventKey id = await _producer.OnDeliveryAsync(email, productId, DateTimeOffset.Now);
return id;
}

[HttpPost("on-received")]
[ProducesResponseType(StatusCodes.Status201Created)]
public async Task<string> PostOnReceivedAsync([FromBody] StatePayload payload)
{
var (email, productId) = payload;
_logger.LogDebug("Sending on-received event");
EventKey id = await _producer.OnReceivedAsync(email, productId, DateTimeOffset.Now);
return id;
}
}

5. Define a Consumer Job:

public sealed class ConsumerJob : IHostedService
{
private readonly IConsumerSubscribeBuilder _builder;
private CancellationTokenSource? _cancellationTokenSource;
private readonly IShipmentTrackingConsumer _subscriber;
private IConsumerLifetime? _subscription;

public ConsumerJob(
ILogger<ConsumerJob> logger,
IConsumerReadyBuilder consumerBuilder)
{
_builder = consumerBuilder.WithLogger(logger);
_subscriber = new Subscriber(logger);
}

Task IHostedService.StartAsync(CancellationToken cancellationToken)
{
_cancellationTokenSource = new CancellationTokenSource();
var canellation = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancellationTokenSource.Token);
_subscription = _builder
.WithCancellation(canellation.Token)
// this extension is generate (if you change the interface use the correlated new generated extension method)
.SubscribeShipmentTrackingConsumer(_subscriber);

return Task.CompletedTask;
}

async Task IHostedService.StopAsync(CancellationToken cancellationToken)
{
_cancellationTokenSource?.Cancel();
await (_subscription?.Completion ?? Task.CompletedTask);
}

private sealed class Subscriber : IShipmentTrackingConsumer
{
private readonly ILogger _logger;

public Subscriber(
ILogger logger)
{
_logger = logger;
}

ValueTask IShipmentTrackingConsumer.OrderPlacedAsync(
ConsumerContext ctx,
User user,
Product product,
DateTimeOffset time)
{
Metadata meta = ctx.Metadata;
_logger.LogInformation("handling OrderPlaced [{message-id}]: email: {email}, product: {productId}, which produce at {time}", meta.MessageId, user.email, product.id, time);
return ValueTask.CompletedTask;
}

ValueTask IShipmentTrackingConsumer.PackingAsync(
ConsumerContext ctx,
string email,
int productId,
DateTimeOffset time)
{
Metadata meta = ctx.Metadata;
_logger.LogInformation("handling Packing [{message-id}]: email: {email}, product: {productId}, which produce at {time}", meta.MessageId, email, productId, time);
return ValueTask.CompletedTask;
}

ValueTask IShipmentTrackingConsumer.OnDeliveryAsync(
ConsumerContext ctx,
string email,
int productId,
DateTimeOffset time)
{
Metadata meta = ctx.Metadata;
_logger.LogInformation("handling OnDelivery [{message-id}]: email: {email}, product: {productId}, which produce at {time}", meta.MessageId, email, productId, time);
return ValueTask.CompletedTask;
}

ValueTask IShipmentTrackingConsumer.OnReceivedAsync(
ConsumerContext ctx,
string email,
int productId,
DateTimeOffset time)
{
Metadata meta = ctx.Metadata;
_logger.LogInformation("handling OnReceived [{message-id}]: email: {email}, product: {productId}, which produce at {time}", meta.MessageId, email, productId, time);
return ValueTask.CompletedTask;
}
}
}

6. Configuration (Program.cs)

using System.Collections.Immutable;
using System.Diagnostics;

using Amazon.S3;

using WebSample;
using WebSample.Extensions;
using WebSample.Extensions.ShipmentTracking;


var builder = WebApplication.CreateBuilder(args);

// ############## START EVENT SOURCING CONFIGURATION ##################
// Setup S3 client with a profile taken from `appsettings.json`
// Delete the next 2 lines for using credentials from environment variable
builder.Services.AddDefaultAWSOptions(builder.Configuration.GetAWSOptions());
builder.Services.AddAWSService<IAmazonS3>();

// Register Redis connection for event sourcing (using environment variables [REDIS_EVENT_SOURCE_ENDPOINT, REDIS_EVENT_SOURCE_PASS])
builder.Services.AddEventSourceRedisConnection();

// EventSourcing.Backbone can be environment sensitive
IWebHostEnvironment environment = builder.Environment;
string env = environment.EnvironmentName;

string URI = "shipment-tracking";
// make sure to create the bucket on AWS S3 with both prefix 'dev.' and 'prod.' and any other environment you're using (like staging,etc.)
string s3Bucket = "shipment-tracking-sample";

var s3Options = new S3Options { Bucket = s3Bucket };
builder.Services.AddSingleton(ioc =>
{
ILogger logger = ioc.GetService<ILogger<Program>>() ?? throw new EventSourcingException("Logger is missing");
IShipmentTrackingProducer producer = ioc.ResolveRedisProducerChannel()
.ResolveS3Storage(s3Options)
.Environment(env)
.Uri(uri)
.WithLogger(logger)
.BuildShipmentTrackingProducer();
return producer;
});

builder.Services.AddSingleton(ioc =>
{
IConsumerReadyBuilder consumer =
ioc.ResolveRedisConsumerChannel()
.ResolveS3Storage(s3Options)
.Environment(env)
.Uri(uri);
return consumer;
});

builder.Services.AddHostedService<ConsumerJob>();
// ############## END OF EVENT SOURCING CONFIGURATION ##################

builder.Services.AddControllers();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

var app = builder.Build();
// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseHttpsRedirection();
app.UseAuthorization();
app.MapControllers();
app.Run();

7. Environment Variable

Set the Environment Variable of the REDIS & S3:

  • REDIS_EVENT_SOURCE_ENDPOINT
  • REDIS_EVENT_SOURCE_PASS
  • S3_EVENT_SOURCE_ACCESS_KEY
  • S3_EVENT_SOURCE_SECRET
  • S3_EVENT_SOURCE_REGION

Setup the Easy Way

In the previous post, we hinted at a simpler approach for building your EventSourcing.Backbone solution. Brace yourself for an effortless setup! Here’s how:

Step 1: Install the Template

To get started, install the Event-Sourcing.Backbone.Templates by running the following command:

dotnet new install Event-Sourcing.Backbone.Templates

Step 2: Utilize the Template

Now, leverage the power of the template and configure your solution for S3 storage and authentication with a few switches. Execute the following command, tailoring it to your specific needs:

// using `appsettings.json` profile credencials
dotnet new evtsrc -uri event-demo -s3 --aws-profile AWS_PROFILE --aws-profile-region us-east-1 --s3-bucket event-sourcing-demo --consumer-group main-consumer -n MyCompany.Events -e MyEvent

// using credentials from environment variables [`S3_EVENT_SOURCE_REGION`, `S3_EVENT_SOURCE_ACCESS_KEY` and `S3_EVENT_SOURCE_SECRET`]
dotnet new evtsrc -uri event-demo-s3 -s3 -s3-cred environment-variable --s3-bucket event-sourcing-demo --consumer-group main-consumer -n MyCompany.Events.S3Storage -e MyEvent

Step 3: Set the environment for testing

docker run -p 6379:6379 -it --rm --name redis-event-source-sample redislabs/rejson:latest

Environment Variable

Before you run it, you have to set the Environment Variable of the REDIS end-point and password (by default it looks for, password is optional):

  • REDIS_EVENT_SOURCE_ENDPOINT
  • REDIS_EVENT_SOURCE_PASS

Conclusion

In this post, we’ve explored the benefits of replacing the key-value database in EventSourcing.Backbone with AWS S3 for data storage. By leveraging S3’s persistent and resilient storage capabilities, you can enhance the reliability, scalability, and cost-effectiveness of your event-sourcing architecture.

AWS S3 offers a robust storage solution, providing the necessary infrastructure to handle large volumes of event data while ensuring high availability and durability by integrating S3 with EventSourcing.Backbone and combining it with Redis Hash for caching, you can create a powerful and efficient event-sourcing solution.

Stay tuned for the next blog post, where we’ll cover the combination of AWS S3 and Redis Hash in EventSourcing.Backbone

Resources

Configuring AWS Credentials for .NET Applications — Detailed Guide

EventSourcing.Backbone at GitHub

The code sample for this post is available on GitHub (S3 branch).

Finally

If you found something useful in this article, you can leave a star on GitHub:

If you have any questions, please feel free to contact me:

https://www.linkedin.com/in/bnaya-eshet/

Next

This post is the first of a series about event sourcing and an exciting framework called EventSourcing.Backbone. Enter the series to learn more.

--

--