Serverless event aggregation with AWS Lambda and DynamoDB

Kurtis Mash
6 min readJun 9, 2024

--

Event-driven architecture is great within a serverless cloud environment; it enables the decoupling of processes to make the most of the massive scalability the cloud has to offer. However, the real world eventually catches up with us all. The same scalability we benefited from moments ago now curses us as we attempt to write the results of our processing back to a legacy system that prefers to handle data in large batches. If this wasn’t problematic enough, some systems have yet to reap the benefits of idempotency, requiring additional processing upfront to prevent race conditions leading to duplicate entries being created.

Whilst some patterns exist to solve these problems, many of these include a “waiting” state where messages collect until a scheduled process runs to retrieve the built-up batch. This has the disadvantage of introducing additional latency into our process and creates a bursty workload for downstream components as many batches are created in a single run of the retriever. This pattern aims to solve these issues by beginning the processing of a batch as soon as possible.

The pattern

An AWS architecture diagram. Starts on the left with an SQS queue, named “input-queue”, follows an arrow to the right to the “aggregator-lambda”. The flow continues right from the “aggregator-lambda” into a DynamoDB table, “lock-table”, and another SQS queue, “batch-queue”. Both the table and the second queue have an arrow into the final element, the “batch-processor-lambda”.

Whilst this looks rather normal, the brains of this aggregator are within the Lambda functions and the Event Source Mapping configurations, following a 2020 change in how Lambda supports batching and a 2023 change in how Lambda scaling can be configured.

TLDR; We use Lambda’s built-in batching function and in-memory caching to aggregate our events and write into DynamoDB, then read back the full batch with a query.

aggregator-lambda

The aggregator-lambda is configured to receive messages from our input-queue. The Event Source Mapping for this is configured with the highest possible BatchSize and MaximumBatchingWindowInSeconds; the Lambda won’t be invoked until the payload size is 6MB or 300 seconds has elapsed since the first message was received.

{
"EventSourceMappings": [
{
"BatchSize": 1500,
"MaximumBatchingWindowInSeconds": 300,
"EventSourceArn": "arn:aws:sqs:eu-west-1:000000000000:input-queue",
"FunctionArn": "arn:aws:lambda:eu-west-1:000000000000:function:aggregator-lambda",
"FunctionResponseTypes": ["ReportBatchItemFailures"],
"ScalingConfig": {
"MaximumConcurrency": 2
},
...
}
]
}

On the surface, the processing this Lambda completes is relatively simple. It creates a new UUID to be used as the batchId, sends this UUID to our batch-queue with a delay, attempts to write the message to our lock-table, and returns a partial batch response so that only messages that were successfully written our lock-table are deleted from the input-queue. We send the batchId first to ensure that the batch is processed even if the Lambda times out before it finishes processing. And use a ConditionExpression when writing to DynamoDB to prevent the same message from being processed in multiple batches simultaneously.

import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import { PutCommand, DynamoDBDocumentClient } from "@aws-sdk/lib-dynamodb";

const DDB_LOCK_TABLE = process.env.DDB_LOCK_TABLE;
const dbClient = new DynamoDBClient();
const docClient = DynamoDBDocumentClient.from(dbClient);

export const handler = async (event, context) => {
const lambdaTimeoutTime = Date.now() + context.getRemainingTimeInMillis();

// Attempt to add each message to DynamoDB.
const promises = event.Records.map(async (i) => {
const batchId = await getBatchId(i.eventSourceARN, lambdaTimeoutTime);
const ddbCommand = new PutCommand({
TableName: DDB_LOCK_TABLE,
Item: {
messageId: i.messageId,
batchId,
message: i.body,
},
ConditionExpression: "attribute_not_exists(messageId)",
});
try {
await docClient.send(ddbCommand);
} catch (e) {
return { success: false, sqsMessageId: i.messageId };
}
return { success: true };
});

const results = await Promise.all(promises);
const failures = results.filter((i) => !i.success);
return {
batchItemFailures: failures.map((i) => ({
itemIdentifier: i.sqsMessageId,
})),
};
};

However, due to Lambda’s 6MB invocation payload limit and the large amount of non-optional SQS metadata included in each received message, we need to make this function smarter if it is to handle batches of more than a few thousand items. We do this by caching the current batchId outside of the handler function. To encourage Lambda to handle messages within the same function containers, we specify a MaximumConcurrency on the Event Source Mapping which prevents Lambda from scaling up with additional SQS pollers. As we now want multiple invocations of the Lambda function to complete before we begin processing the batch, we implement a MAX_SECONDARY_BATCHING_WINDOW variable into our code to specify how long we should wait before starting to process a batch.

import { v4 } from "uuid";
import { SendMessageCommand, SQSClient } from "@aws-sdk/client-sqs";

const OUTPUT_SQS_URL = process.env.OUTPUT_SQS_URL;
const MAX_SECONDARY_BATCHING_WINDOW = process.env.MAX_SECONDARY_BATCHING_WINDOW || 300;
const MAX_BATCH_SIZE = process.env.MAX_BATCH_SIZE || 50000;
const sqsClient = new SQSClient();
const BATCH_CACHE = {};

const startNewBatch = async (sourceId, lambdaTimeoutTime) => {
const batchId = v4();
const expires = lambdaTimeoutTime + MAX_SECONDARY_BATCHING_WINDOW * 1000;
BATCH_CACHE[sourceId] = {batchId, expires,items: 1};
const sqsCommand = new SendMessageCommand({
QueueUrl: OUTPUT_SQS_URL,
DelaySeconds: Math.ceil((expires - Date.now()) / 1000),
MessageBody: JSON.stringify({ batchId }),
});
await sqsClient.send(sqsCommand);
return batchId;
};

const getBatchId = async (sourceId, lambdaTimeoutTime) => {
const currentBatchForSource = BATCH_CACHE[sourceId];

if (!currentBatchForSource) {
// No existing batch
return startNewBatch(sourceId, lambdaTimeoutTime);
} else if (lambdaTimeoutTime >= currentBatchForSource.expires) {
// Current batch starts processing before Lambda timeout
return startNewBatch(sourceId, lambdaTimeoutTime);
} else if (currentBatchForSource.items >= MAX_BATCH_SIZE) {
// Max batch size has been reached
return startNewBatch(sourceId, lambdaTimeoutTime);
} else {
currentBatchForSource.items += 1;
return currentBatchForSource.batchId;
}
};

lock-table

Our DynamoDB lock-table is relatively simple. The partition key is set as the messageId attribute. A Global Secondary Index (GSI) has been added using the batchId attribute.

+-----------+---------+---------+
| messageId | batchId | message |
+-----------+---------+---------+
| | | |
+-----------+---------+---------+
| | | |
+-----------+---------+---------+

batch-processor-lambda

The final piece of our aggregator is our batch-processor-lambda. This Lambda is triggered by our batch-queue but only has a BatchSize of 1 configured — it’ll receive 1 message per batch.

This function’s primary job is to query our lock-table using the batchId GSI to retrieve all the messages within the batch and process them as a single entity. The processing of the batch could be completed within this function or in a downstream component.

All we need to remember is that once we’ve finished processing the batch we need to delete all the items from our lock-table to allow duplicate messages to be processed, now without a race condition.

Does it work?

Sample code for this pattern can be found on GitHub - https://github.com/kurtismash/aws-serverless-unique-event-aggregator.

In reality, it’s rare to see a duplicate message from SQS, but we must account for them due to SQS’s at-least-once delivery guarantee. The repository includes a Python script to send messages to the input-queue with a parametrized duplicate rate. For this reason, the messageId value is within the message body, but there’s no reason you couldn’t use the messageId from the SQS metadata instead.

Testing a dormant system with a burst of 1.2 million messages resulted in 26 batches being processed; 22 of these were made up of the MAX_BATCH_SIZE of 50,000 items, and the last 4 batches had 49,460, 43,280, 1,945, and 5,315 items respectively. All batches had been aggregated and deleted from the batch table within 21 minutes.

Caveats

  • As this pattern relies heavily on DynamoDB it is prone to write throttling. It is recommended to prewarm the lock-table when first created.
  • Similarly, the Global Secondary Index on batchId can become hot for both reads and writes. As GSIs are updated asynchronously, using an eventually consistent model, the batch-processor could read a batch before the GSI is fully populated. It is recommended to clear the lock-table synchronously after processing, then query again for remaining items and reprocess these as necessary.
  • Depending on your expected throughout this model could incur high costs due to the required Write Request Units on DynamoDB. This pattern has been designed for use in a system with a bursty workload, expected to sit dormant for long periods.
  • As Lambda Event Source Mappings have a minimum MaximumConcurrency of 2, is it expected that bursts smaller than the configured MAX_BATCH_SIZE will be processed in at least 2 batches.

--

--