Paul Bourdel
Dec 17, 2018 · 9 min read

Overview

This article will explore one possible way to implement the event sourcing pattern in AWS using AWS services. Event sourcing is a pattern that involves saving every state change to your application allowing you to rebuild the application state from scratch via event playback.

When to Use

Event sourcing adds extra complexity to your application and is overkill for many use cases but can be invaluable in the right circumstances. The following are some instances of when you would want to implement the event sourcing pattern.

  • When you need an audit log of every event in the system or micro service as opposed to only the current state of the system.
  • When you need to be able to replay events after code bug fixes to fix data.
  • When you need to be able to reverse events.
  • When you want to use the Event Log for debugging.
  • When it’s valuable to expose the Event Log to support personnel to fix account specific issues by knowing exactly how the account got into a compromised state.
  • When you want to create test environments from the event log. Having the event log can allow you to create environments based on the state of the system at different points in time by replaying only a subset of events.

Event Sourcing Deep Dive

Event sourcing stores every state change to the application in an event object. These event objects are stored in the sequence they were applied for the lifetime of the application.

A core principle of event sourcing is that all changes to domain objects are done as a result of an event. This principle allows us to do a number of useful operations:

  • Complete Rebuild: We can start from a clean slate and completely rebuild the application state from events in the event log.
  • Event Replay: We replay events in the system with updated application logic to correct incorrect processing of events. Useful for fixing bugs in code and then doing a replay to correct the data.
  • Event Reversal: If events are stored as diffs or have reversal information it is possible to reverse specific events without having to do a replay from a clean application state.

Application State Storage

To calculate the recent application state with Event Sourcing we would start from a blank slate and apply all the events to reach the current state. Since most applications commonly request the current application state it is common to store the current application state as well for fast retrieval. If lost or corrupted, the current application state can always be derived from the event log.

Event Reversal

If events are captured as diffs then reversing the event is as simple as undoing the diff. In most cases events will have just the final value, not the diff(Account Balance = $100 vs Account Balance += $20). In this case reversing the event with only the information in the final event is not possible since we do not have sufficient information.

If the event only contains the final value, then it would also need to store information on how to reverse the event.

If storing the reverse information is not viable then reversing can always be done by playing the event log up to the previous event, effectively reversing the current event.

Interacting with External Systems

External Updates

External systems that are not designed for event sourcing can behave in unintended ways when receiving duplicate update messages during replays. This can be handled by wrapping interactions with external systems in a gateway. The gateway would incorporate logic about the external system and not forward events during replays.

External Queries

The main challenge with external queries is when doing Event Replays. If for any event you need to query an external system that does not support time based queries then the external data will be wrong. You will get the current state of the external data, not the state when the original event was processed.

One option is to log the response of all external queries. During event replays this will allow you to have your gateway to the external system use the logged value to accurately represent the interaction.

Handling Code Changes

There are two main types of code changes that can affect reprocessing of events:

  • New features.
  • Bug fixes.

New Features

When doing events replays with new features you will want to disable external gateways. This prevents external systems from being updated for features that did not exist at the time the event was generated.

Bug Fixes

In the case of bug fixes we will deploy the code fix and reprocess the events. For events that don’t have any external interactions, this is straightforward and one of the main benefits of event sourcing. After the events are reprocessed the data will be corrected.

With external systems, the gateway needs to know if it can just send the event processed by the fixed code or if there is a diff that needs to be computed before the external system is called. The diff is necessary to reverse the original buggy event that had previously went out to the external system.

Any time sensitive logic, such as do one thing before a certain date and a different thing after the date, will need to be included in the domain model of the event. Time sensitive logic can get messy and is best avoided.

AWS Event Sourcing Design

The following design leverages a number of AWS services:

API Gateway

API Gateway is leveraged to receive incoming requests on a web facing url. The API Gateway Method is then configured to forward those requests to an AWS Lambda that acts a loader.

AWS Lambda

Lambdas are leveraged for the following purposes:

Load Lambda: used to load incoming requests from the API Gateway into the Kinesis Event Stream.

Pump Lambda: receives events from the incoming stream and puts them in the Event Store DB.

Write Lambda: receives events from the stream and stores only the latest state in the Data Store DB. Any business logic can be applied here before writing to the DB just like in a regular application.

Playback Lambda: can be triggered manually to read all or a subset of events from the Events Table and send them to the Kinesis Playback Stream. The reason this does not write directly to the Data Store is so you later attach additional subscribers to the Kinesis Playback stream as your application needs evolve.

Microservice Lambda: this Lambda will contain your application’s business logic and process the incoming event.

DynamoDB

DynamoDB is used to store the events. We have two tables per service.

Events Table: stores every single event that is submitted to the system. This is the immutable system of record for the application.

Data Store: stores the latest state for quick access by the application. This is not the system of record and can be wiped and rebuilt from the Event Table when necessary.

Sample Code

Terraform for Creating a Lambda

#############################
# Lambda
#############################
resource "aws_lambda_function" "terraform_kinesis_streamer_func" {
function_name = "EventSourcing"
s3_bucket = "terraform-event-sourcing"
s3_key = "v1.0.0/lambda-code.zip"
handler = "com.slalom.lambda.handler.ProxyWithStream::handleRequest"
runtime = "java8"
role = "${aws_iam_role.iam_for_terraform_lambda.arn}"timeout = 300
memory_size = 512
}
resource "aws_lambda_permission" "apigw" {
statement_id = "AllowAPIGatewayInvoke"
action = "lambda:InvokeFunction"
function_name = "${aws_lambda_function.terraform_kinesis_streamer_func.arn}"
principal = "apigateway.amazonaws.com"
# The /*/* portion grants access from any method on any resource
# within the API Gateway "REST API".
source_arn = "${aws_api_gateway_deployment.event_sourcing.execution_arn}/*/*"
}

Terraform for Configuring API Gateway to Invoke the Lambda

#############################
# API Gateway Event Sourcing
#############################
resource "aws_api_gateway_resource" "event_sourcing" {
rest_api_id = "${aws_api_gateway_rest_api.event_sourcing.id}"
parent_id = "${aws_api_gateway_rest_api.event_sourcing.root_resource_id}"
path_part = "event-sourcing"
}
resource "aws_api_gateway_method" "event_sourcring" {
rest_api_id = "${aws_api_gateway_rest_api.event_sourcing.id}"
resource_id = "${aws_api_gateway_resource.event_sourcing.id}"
http_method = "POST"
authorization = "NONE"
}
# Integration
resource "aws_api_gateway_integration" "event_sourcing_lambda" {
rest_api_id = "${aws_api_gateway_rest_api.event_sourcing.id}"
resource_id = "${aws_api_gateway_method.event_sourcring.resource_id}"
http_method = "${aws_api_gateway_method.event_sourcring.http_method}"
integration_http_method = "POST"
type = "AWS_PROXY"
uri = "${aws_lambda_function.terraform_kinesis_streamer_func.invoke_arn}"
}
# Deployment
resource "aws_api_gateway_deployment" "event_sourcing" {
depends_on = [
"aws_api_gateway_integration.event_sourcing_lambda",
]
rest_api_id = "${aws_api_gateway_rest_api.event_sourcing.id}"
stage_name = "dev"
}
output "base_url" {
value = "${aws_api_gateway_deployment.event_sourcing.invoke_url}"
}

Load Lambda Code (API Gateway to Kinesis Event Stream)

The below example is written in Java. The Lambda:

  • Receives requests from the AWS API Gateway.
  • Parses the Request.
  • Uses the data from the request to send an event the Kinesis Event Stream.
  • Responds to the API Gateway, which in turn, response to the client.
public class ProxyWithStream implements RequestStreamHandler {
JSONParser parser = new JSONParser();
public void handleRequest(InputStream inputStream, OutputStream outputStream, Context context) throws IOException {LambdaLogger logger = context.getLogger();
logger.log("Loading Java Lambda handler of ProxyWithStream");
BufferedReader reader = new BufferedReader(new InputStreamReader(inputStream));
JSONObject responseJson = new JSONObject();
String responseCode = "200";
try {
JSONObject event = (JSONObject) parser.parse(reader);
if (event.get("queryStringParameters") != null) {
JSONObject qps = (JSONObject) event.get("queryStringParameters");
}
if (event.get("pathParameters") != null) {
JSONObject pps = (JSONObject) event.get("pathParameters");
}
if (event.get("headers") != null) {
JSONObject hps = (JSONObject) event.get("headers");
}
responseJson.put("isBase64Encoded", false);
responseJson.put("statusCode", responseCode);
JSONObject headerJson = new JSONObject();
responseJson.put("headers", headerJson);
JSONObject responseBody = new JSONObject();
responseBody.put("message", event.toJSONString());
AmazonKinesis kinesis = AmazonKinesisClientBuilder.standard().build();PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setStreamName("event-stream");
putRecordRequest.setData(ByteBuffer.wrap((event.toJSONString().getBytes())));
putRecordRequest.setPartitionKey(UUID.randomUUID().toString());
kinesis.putRecord(putRecordRequest);
responseJson.put("body", responseBody.toString());} catch (ParseException pex) {
responseJson.put("statusCode", "400");
responseJson.put("exception", pex);
}
logger.log(responseJson.toJSONString());
OutputStreamWriter writer = new OutputStreamWriter(outputStream, "UTF-8");
writer.write(responseJson.toJSONString());
writer.close();
}
}

Write Lambda Code (Kinesis Event Stream Client to Data Store)

The below example is written in Java. The Lambda:

  • Receives events from the Kinesis Event Stream.
  • Parses the Event and executes any transformation or business logic.
  • Writes the data from the event into the application Data Store to be later read by the application.
public class KinesisLambdaReceiver implements RequestHandler<KinesisEvent, Void> {public Void handleRequest(KinesisEvent event, Context context) {
LambdaLogger logger = context.getLogger();
logger.log("Received " + event.getRecords().size() + " raw Event Records.");
RecordDeaggregator.stream(event.getRecords().stream(), dataRecord -> {
logger.log(new String(dataRecord.getData().array()));
AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard().build();
DynamoDB dynamoDB = new DynamoDB(client);Table table = dynamoDB.getTable("EventSourcingDataStore");final Map<String, Object> dataMap = new HashMap<>();
dataMap.put("data", new String(dataRecord.getData().array()));
PutItemOutcome outcome = table
.putItem(new Item().withPrimaryKey("id", dataRecord.getExplicitHashKey()).withMap("data", dataMap));
});return null;
}
}

Deploying the Solution

Deploying the solution requires two main parts:

  • Compiling the Java Lambda code.
  • Applying the Terraform configs to deploy the Lambda and other infrastructure.

Compiling the Java Lambda Code

The project I used to develop the Lambda code is a standard Java Gradle project. Which ever way you prefer to structure your project, you need to ensure that your build packages your project into a self contained zip file. This zip file will later be used by your Terraform config to deploy the lambda.

build.gradle

apply plugin: 'java'repositories {
mavenCentral()
}
dependencies {
compile (
'com.amazonaws:aws-lambda-java-core:1.1.0',
'com.amazonaws:aws-lambda-java-events:1.1.0',
'com.amazonaws:aws-lambda-java-log4j:1.0.0',
'com.amazonaws:aws-java-sdk-kinesis:1.11.373',
'com.amazonaws:amazon-kinesis-client:1.8.8',
'com.amazonaws:amazon-kinesis-deaggregator:1.0.3'
)
}
task buildZip(type: Zip) {
from compileJava
from processResources
into('lib') {
from configurations.compileClasspath
}
}
build.dependsOn buildZip

Deploying the Lambda

To deploy the Lambda you can use the Terraform examples mentioned previously in this article. To ensure your code changes are deployed check that the filename or s3_bucket/s3_key Terraform arguments point to the zip with your latest Lambda code.

  • filename - (Optional) The path to the function's deployment package within the local filesystem. If defined, The s3_-prefixed options cannot be used.
  • s3_bucket - (Optional) The S3 bucket location containing the function's deployment package. Conflicts with filename. This bucket must reside in the same AWS region where you are creating the Lambda function.
  • s3_key - (Optional) The S3 key of an object containing the function's deployment package. Conflicts with filename.

After ensuring the above, apply your Terraform config to deploy your lambda.

➜ terraform apply+ aws_lambda_function.terraform_kinesis_streamer_func
id: <computed>
arn: <computed>
function_name: "EventSourcing"
handler: "com.leanstacks.hello.lambda.handler.ProxyWithStream::handleRequest"
invoke_arn: <computed>
last_modified: <computed>
memory_size: "512"
publish: "false"
qualified_arn: <computed>
role: "${aws_iam_role.iam_for_terraform_lambda.arn}"
runtime: "java8"
s3_bucket: "terraform-event-sourcing"
s3_key: "v1.0.0/lambda-code.zip"
source_code_hash: <computed>
source_code_size: <computed>
timeout: "300"
tracing_config.#: <computed>
version: <computed>
Do you want to perform these actions?
Terraform will perform the actions described above.
Only 'yes' will be accepted to approve.
Enter a value: yes
Apply complete! Resources: 1 added, 0 changed, 0 destroyed.

After the terraform command completes your Lambda and the rest of your Terraform defined infrastructure will be deployed to your AWS account.

Summary

Hopefully this post shows that Event Sourcing is not as intimidating as it can sound and can be implemented utilizing AWS services such as Kinesis, API Gateway, and DynamoDB to save significant time on development. Using the Terraform and Java code examples in this post can give you a head start in creating your own implementation!

References

Slalom Engineering

Insights and opinions from software engineers and engineering-focused teams at Slalom.

Paul Bourdel

Written by

Principal Consultant @ Slalom

Slalom Engineering

Insights and opinions from software engineers and engineering-focused teams at Slalom.

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade