Serverless event aggregation with AWS Lambda and DynamoDB
Event-driven architecture is great within a serverless cloud environment; it enables the decoupling of processes to make the most of the massive scalability the cloud has to offer. However, the real world eventually catches up with us all. The same scalability we benefited from moments ago now curses us as we attempt to write the results of our processing back to a legacy system that prefers to handle data in large batches. If this wasn’t problematic enough, some systems have yet to reap the benefits of idempotency, requiring additional processing upfront to prevent race conditions leading to duplicate entries being created.
Whilst some patterns exist to solve these problems, many of these include a “waiting” state where messages collect until a scheduled process runs to retrieve the built-up batch. This has the disadvantage of introducing additional latency into our process and creates a bursty workload for downstream components as many batches are created in a single run of the retriever. This pattern aims to solve these issues by beginning the processing of a batch as soon as possible.
The pattern
Whilst this looks rather normal, the brains of this aggregator are within the Lambda functions and the Event Source Mapping configurations, following a 2020 change in how Lambda supports batching and a 2023 change in how Lambda scaling can be configured.
TLDR; We use Lambda’s built-in batching function and in-memory caching to aggregate our events and write into DynamoDB, then read back the full batch with a query.
aggregator-lambda
The aggregator-lambda is configured to receive messages from our input-queue. The Event Source Mapping for this is configured with the highest possible BatchSize
and MaximumBatchingWindowInSeconds
; the Lambda won’t be invoked until the payload size is 6MB or 300 seconds has elapsed since the first message was received.
{
"EventSourceMappings": [
{
"BatchSize": 1500,
"MaximumBatchingWindowInSeconds": 300,
"EventSourceArn": "arn:aws:sqs:eu-west-1:000000000000:input-queue",
"FunctionArn": "arn:aws:lambda:eu-west-1:000000000000:function:aggregator-lambda",
"FunctionResponseTypes": ["ReportBatchItemFailures"],
"ScalingConfig": {
"MaximumConcurrency": 2
},
...
}
]
}
On the surface, the processing this Lambda completes is relatively simple. It creates a new UUID to be used as the batchId
, sends this UUID to our batch-queue
with a delay, attempts to write the message to our lock-table, and returns a partial batch response so that only messages that were successfully written our lock-table are deleted from the input-queue. We send the batchId
first to ensure that the batch is processed even if the Lambda times out before it finishes processing. And use a ConditionExpression
when writing to DynamoDB to prevent the same message from being processed in multiple batches simultaneously.
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import { PutCommand, DynamoDBDocumentClient } from "@aws-sdk/lib-dynamodb";
const DDB_LOCK_TABLE = process.env.DDB_LOCK_TABLE;
const dbClient = new DynamoDBClient();
const docClient = DynamoDBDocumentClient.from(dbClient);
export const handler = async (event, context) => {
const lambdaTimeoutTime = Date.now() + context.getRemainingTimeInMillis();
// Attempt to add each message to DynamoDB.
const promises = event.Records.map(async (i) => {
const batchId = await getBatchId(i.eventSourceARN, lambdaTimeoutTime);
const ddbCommand = new PutCommand({
TableName: DDB_LOCK_TABLE,
Item: {
messageId: i.messageId,
batchId,
message: i.body,
},
ConditionExpression: "attribute_not_exists(messageId)",
});
try {
await docClient.send(ddbCommand);
} catch (e) {
return { success: false, sqsMessageId: i.messageId };
}
return { success: true };
});
const results = await Promise.all(promises);
const failures = results.filter((i) => !i.success);
return {
batchItemFailures: failures.map((i) => ({
itemIdentifier: i.sqsMessageId,
})),
};
};
However, due to Lambda’s 6MB invocation payload limit and the large amount of non-optional SQS metadata included in each received message, we need to make this function smarter if it is to handle batches of more than a few thousand items. We do this by caching the current batchId
outside of the handler function. To encourage Lambda to handle messages within the same function containers, we specify a MaximumConcurrency
on the Event Source Mapping which prevents Lambda from scaling up with additional SQS pollers. As we now want multiple invocations of the Lambda function to complete before we begin processing the batch, we implement a MAX_SECONDARY_BATCHING_WINDOW
variable into our code to specify how long we should wait before starting to process a batch.
import { v4 } from "uuid";
import { SendMessageCommand, SQSClient } from "@aws-sdk/client-sqs";
const OUTPUT_SQS_URL = process.env.OUTPUT_SQS_URL;
const MAX_SECONDARY_BATCHING_WINDOW = process.env.MAX_SECONDARY_BATCHING_WINDOW || 300;
const MAX_BATCH_SIZE = process.env.MAX_BATCH_SIZE || 50000;
const sqsClient = new SQSClient();
const BATCH_CACHE = {};
const startNewBatch = async (sourceId, lambdaTimeoutTime) => {
const batchId = v4();
const expires = lambdaTimeoutTime + MAX_SECONDARY_BATCHING_WINDOW * 1000;
BATCH_CACHE[sourceId] = {batchId, expires,items: 1};
const sqsCommand = new SendMessageCommand({
QueueUrl: OUTPUT_SQS_URL,
DelaySeconds: Math.ceil((expires - Date.now()) / 1000),
MessageBody: JSON.stringify({ batchId }),
});
await sqsClient.send(sqsCommand);
return batchId;
};
const getBatchId = async (sourceId, lambdaTimeoutTime) => {
const currentBatchForSource = BATCH_CACHE[sourceId];
if (!currentBatchForSource) {
// No existing batch
return startNewBatch(sourceId, lambdaTimeoutTime);
} else if (lambdaTimeoutTime >= currentBatchForSource.expires) {
// Current batch starts processing before Lambda timeout
return startNewBatch(sourceId, lambdaTimeoutTime);
} else if (currentBatchForSource.items >= MAX_BATCH_SIZE) {
// Max batch size has been reached
return startNewBatch(sourceId, lambdaTimeoutTime);
} else {
currentBatchForSource.items += 1;
return currentBatchForSource.batchId;
}
};
lock-table
Our DynamoDB lock-table is relatively simple. The partition key is set as the messageId
attribute. A Global Secondary Index (GSI) has been added using the batchId
attribute.
+-----------+---------+---------+
| messageId | batchId | message |
+-----------+---------+---------+
| | | |
+-----------+---------+---------+
| | | |
+-----------+---------+---------+
batch-processor-lambda
The final piece of our aggregator is our batch-processor-lambda. This Lambda is triggered by our batch-queue but only has a BatchSize
of 1 configured — it’ll receive 1 message per batch.
This function’s primary job is to query our lock-table using the batchId
GSI to retrieve all the messages within the batch and process them as a single entity. The processing of the batch could be completed within this function or in a downstream component.
All we need to remember is that once we’ve finished processing the batch we need to delete all the items from our lock-table to allow duplicate messages to be processed, now without a race condition.
Does it work?
Sample code for this pattern can be found on GitHub - https://github.com/kurtismash/aws-serverless-unique-event-aggregator.
In reality, it’s rare to see a duplicate message from SQS, but we must account for them due to SQS’s at-least-once delivery guarantee. The repository includes a Python script to send messages to the input-queue with a parametrized duplicate rate. For this reason, the messageId
value is within the message body, but there’s no reason you couldn’t use the messageId
from the SQS metadata instead.
Testing a dormant system with a burst of 1.2 million messages resulted in 26 batches being processed; 22 of these were made up of the MAX_BATCH_SIZE
of 50,000 items, and the last 4 batches had 49,460, 43,280, 1,945, and 5,315 items respectively. All batches had been aggregated and deleted from the batch table within 21 minutes.
Caveats
- As this pattern relies heavily on DynamoDB it is prone to write throttling. It is recommended to prewarm the lock-table when first created.
- Similarly, the Global Secondary Index on
batchId
can become hot for both reads and writes. As GSIs are updated asynchronously, using an eventually consistent model, the batch-processor could read a batch before the GSI is fully populated. It is recommended to clear the lock-table synchronously after processing, then query again for remaining items and reprocess these as necessary. - Depending on your expected throughout this model could incur high costs due to the required Write Request Units on DynamoDB. This pattern has been designed for use in a system with a bursty workload, expected to sit dormant for long periods.
- As Lambda Event Source Mappings have a minimum
MaximumConcurrency
of 2, is it expected that bursts smaller than the configuredMAX_BATCH_SIZE
will be processed in at least 2 batches.