Event Sourcing Caching and GDPR Compliance (Data Splitting)

Explore the concept of storage specialization within EventSourcing.Backbone, which encompasses both caching and data splitting.

Bnaya Eshet
Cloud Native Daily
7 min readJul 13, 2023

--

Introduction

In this post, we explore the concept of storage specialization within EventSourcing.Backbone, which encompasses both caching and data splitting. Storage specialization allows for customized behavior, whether it involves delivering cache functionality or efficiently splitting data across different storage options. By leveraging this capability, we can optimize performance, achieve compliance, and enhance overall system efficiency. Join us as we delve into the benefits and strategies of storage specialization in event sourcing and event-driven systems.

Don’t forget to check out the other posts in our EventSourcing.Backbone Series for more insights into event-driven architectures and observability

Specialized Storage for Caching and Data Splitting

Storage specialization empowers you to customize storage behavior to meet specific requirements.

For instance, you can leverage Redis for dedicated caching, ensuring swift data retrieval. By storing recent event data in Redis, you optimize performance and enhance responsiveness.

Leveraging the assumption that recent event data is accessed more frequently.

On the other hand, utilizing S3 for event data splitting provides durability and long-term storage capabilities. This allows you to store less frequently accessed or historical data in S3, optimizing costs while ensuring data resilience. With this specialized approach, EventSourcing.Backbone combines the speed of Redis and the durability of S3, striking a balance between performance and cost efficiency.

Caching Setting

All you have to do is to install/update the template.

dotnet new install Event-Sourcing.Backbone.Templates

Execute the CLI (or use the “Event Sourcing Backbone” template from visual studio)

// using `appsettings.json` profile credencials
dotnet new evtsrc -uri event-demo -s3 --aws-profile AWS_PROFILE --aws-profile-region us-east-1 --s3-bucket event-sourcing-demo --consumer-group main-consumer -n MyCompany.Events -e JobOffer

// using credentials from environment variables [`S3_EVENT_SOURCE_REGION`, `S3_EVENT_SOURCE_ACCESS_KEY` and `S3_EVENT_SOURCE_SECRET`]
dotnet new evtsrc -uri event-demo-s3 -s3 -s3-cred environment-variable --s3-bucket ev
ent-sourcing-demo --consumer-group main-consumer -n MyCompany.Events.S3Storage -e JobOffer

Make sure to set up the S3 bucket and credentials.

Then add 2 storage types into the event sourcing builder pipeline.

Producer

// JobOfferProducerExtensions.cs
IJobOfferProducer producer = ioc.ResolveRedisProducerChannel()
// Redis storage
.ResolveRedisHashStorage(timeToLive: TimeSpan.FromHours(5))
// S3 storage
.ResolveS3Storage(s3Options)
.Environment(env)
.Uri(uri)
.WithLogger(logger)
.BuildJobOfferProducer();

Please be mindful of the timeToLive parameter. Only utilize it if the storage is explicitly intended for caching, as using it in other cases may lead to data loss.

To learn how to set up S3 credentials, refer to this article. It provides detailed instructions on configuring S3 credentials for your specific use case.

Ensure that you set the AWS region to the same region where the S3 resource is located. This is important to establish the correct connection and ensure proper access to the S3 services.

Consumer

// ConsumerExtensions.cs
return ioc.ResolveRedisConsumerChannel()
// Redis Storage (should be the first for caching scenario)
.ResolveRedisHashStorage()
// S3 storage
.ResolveS3Storage(s3Options)
.Environment(env)
.Uri(uri);

By enabling Telemetry for both Redis and S3 and analyzing the traces (after execution), you will observe that there are no S3 GET requests being made.

The S3 Storage provider will not make a request if the data has already been retrieved (in this case, by the Redis Storage provider).

Data Splitting and Compliance

On the other hand, storage specialization can facilitate efficient data splitting across multiple storage options. By segmenting data based on compliance requirements, domain relevance, or other factors, we ensure appropriate data isolation and better manageability. For example, sensitive PII (Personal Identifiable Information) data can be stored separately for compliance purposes, while domain-specific data can reside in dedicated storage for improved organization.

Storage Splitting in Action

Follow the instruction for having a working sample of storage splitting:

  1. Create an Event Sourcing solution:
// Install the template
dotnet new install Event-Sourcing.Backbone.Templates

// Create a new solut
dotnet new evtsrc -uri storage-splitting -s3 --aws-profile PROFILE_NAME --aws-profile-region us-east-1 --s3-bucket event-sourcing-splitting -n Samples.EventSourcing.StorageSplitting -e Vacation

2. Prepare the scenario

Modify `Samples.EventSourcing.StorageSplitting.Abstractions`:

  • Add:
public record Address(string country, string? state, string city, string street, int houseNumber);

public record User (int id, string name, Address address);

public record Hotel (string name, Address address);

public record Flight (string number, string airline, DateTimeOffset date);
  • Remove `NextStage.cs`
  • Change `IVacation.cs` to:
[EventsContract(EventsContractType.Producer)]
[EventsContract(EventsContractType.Consumer)]
[Obsolete("Choose either the Producer or Consumer version of this interface.", true)]
public interface IVacation
{
ValueTask BookVacationAsync(User user, Hotel hotel, Flight flight);
}

Modify`Samples.EventSourcing.StorageSplitting.Service`

Change `VacationConsumerJob.cs` to:

public sealed class ConsumerJob : IHostedService, IVacationConsumer
{
private readonly IConsumerSubscribeBuilder _builder;
private CancellationTokenSource? _cancellationTokenSource;
private IConsumerLifetime? _subscription;

private readonly ILogger _logger;

public ConsumerJob(
ILogger<ConsumerJob> logger,
IKeyed<IConsumerReadyBuilder> consumerBuilderKeyed)
{
if (!consumerBuilderKeyed.TryGet(VacationConstants.URI, out var consumerBuilder))
throw new EventSourcingException($"Consumer's registration found under the [{VacationConstants.URI}] key.");
_builder = consumerBuilder.WithLogger(logger);
_logger = logger;
}

Task IHostedService.StartAsync(CancellationToken cancellationToken)
{
_cancellationTokenSource = new CancellationTokenSource();
var canellation = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _cancellationTokenSource.Token);
_subscription = _builder
.Group(VacationConstants.CONSUMER_GROUP)
.WithCancellation(canellation.Token)
// this extension is generate (if you change the interface use the correlated new generated extension method)
.SubscribeVacationConsumer(this);

return Task.CompletedTask;
}

async Task IHostedService.StopAsync(CancellationToken cancellationToken)
{
_cancellationTokenSource?.CancelSafe();
await (_subscription?.Completion ?? Task.CompletedTask);
}

async ValueTask IVacationConsumer.BookVacationAsync(ConsumerMetadata consumerMetadata, User user, Hotel hotel, Flight flight)
{
var meta = consumerMetadata.Metadata;
LogLevel level = meta.Environment == "prod" ? LogLevel.Debug : LogLevel.Information;
_logger.Log(level,
"""
handling {event} [{uri}]
"""
, meta.Operation, meta.UriDash);


await consumerMetadata.AckAsync(); // not required on default setting or when configuring the consumer to Ack on success.
}
}
  • Change `VacationProducerController.cs` to:
[ApiController]
[Route("[controller]")]
public class VacationProducerController : ControllerBase
{
private readonly ILogger<VacationProducerController> _logger;
private readonly IVacationProducer _producer;

public VacationProducerController(
ILogger<VacationProducerController> logger,
IKeyed<IVacationProducer> producerKeyed)
{
_logger = logger;
if (!producerKeyed.TryGet(VacationConstants.URI, out var producer))
throw new EventSourcingException($"Producer's registration found under the [{VacationConstants.URI}] key.");
_producer = producer;
}

/// <summary>
/// Post order state.
/// </summary>
/// <param name="name">The payload.</param>
/// <returns></returns>
[HttpPost]
[ProducesResponseType(StatusCodes.Status201Created)]
//[AllowAnonymous]
public async Task<string> SendEventAsync(string name)
{
_logger.LogDebug("Sending event");
var key = await _producer.BookVacationAsync(
new User(name.GetHashCode(), name, new Address("USA", "CA", "best-city", "wonderful street", 43)),
new Hotel("sunny nights", new Address("Italy", null, "Rome", "pantheon", 43)),
new Flight("2882", "Italian-airline", DateTimeOffset.UtcNow.AddDays(7)));
return key;
}
}
  • Fix or delete the Tests projects and make sure that the solution passes compilation.

3. make sure you’ve configured the S3 credentials right& having S3 buckets ”dev.event-sourcing-splitting” and “dev.event-sourcing-splitting-gdrp” for the development environment and ”prod.event-sourcing-splitting” and “prod.event-sourcing-splitting-gdrp” for the production.

If you encounter difficulties creating a bucket with a specific name, consider modifying the bucket name within the VacationConstants.cs file under the VacationConstants class. This adjustment should resolve any issues related to creating the bucket with the desired name.

Ensure that the S3 bucket is in the same AWS region as specified in appsettings.json.

4. Configure the storage on the event sourcing builder.

// VacationProducerExtensions.cs
var gdpr = s3Options with { Bucket = $"{s3Options.Bucket}-gdrp" };
ILogger logger = ioc.GetService<ILogger<Program>>() ?? throw new EventSourcingException("Logger is missing");
IVacationProducer producer = ioc.ResolveRedisProducerChannel()

// set-up the storage splitting
.ResolveS3Storage((meta, key) => key != "user", s3Options)
.ResolveS3Storage((meta, key) => key == "user", gdpr)

.Environment(env)
.Uri(uri)
.WithLogger(logger)
.BuildVacationProducer();
// ConsumerExtensions.cs
var gdpr = s3Options with { Bucket = $"{s3Options.Bucket}-gdrp" };
return ioc.ResolveRedisConsumerChannel()

// set-up the storage splitting
.ResolveS3Storage(s3Options)
.ResolveS3Storage(gdpr)

.Environment(env)
.Uri(uri);

5. Bring the dev environment up5. Run F5

cd ./dockers/compose
docker compose up -d

6. Take a look at the S3 data, and make sure it was split right.

Enabling Compliance and Simplified Data Management

EventSourcing.Backbone excels over using stream providers like Kafka or Redis Stream directly, particularly in terms of compliance and data patterns. It offers a streamlined approach to managing compliance data, such as GDPR, by storing the event payload in familiar storage types. This enables easy access to the payload with domain-specific APIs directly from the storage, while also honoring the “right to be forgotten” principle mandated by GDPR. With EventSourcing.Backbone, compliance becomes more manageable, ensuring data privacy and providing a robust solution for compliance-oriented event-driven systems.

Conclusion:

Storage specialization in EventSourcing.Backbone offers a powerful approach to streamline caching and data splitting within event-driven systems. By tailoring storage behavior to specific requirements, we can optimize performance, achieve compliance, and enhance overall system efficiency. Whether it involves delivering cache functionality or segmenting data across specialized storage options, storage specialization empowers developers to build robust and scalable event-sourcing solutions. Stay tuned for more insights into our EventSourcing.Backbone Series, where we continue to explore the fascinating world of event-sourcing architecture.

--

--