Real-Time Aggregation with DynamoDB Streams

At Signiant we help our customers move their data quickly. Some of our customers transfer a lot of data. Terabytes upon terabytes, every month.

We also strive to give our customers insight into how they are using our product, and feedback on how much data they are moving.

Building a system to meet these two requirements leads to a typical problem in data-intensive applications:

How do you collect and write a ton of data, but also provide an optimal way to read that same data? Can you build this system to be scalable? Resilient to errors? Is it easy to implement and operate?

A typical solution to this problem would be to write a batch process for combining this mass of data into aggregated rows. This approach has a few inherent problems:

  • You need to operate and monitor a fleet of servers to perform the batch operations. Often this comes in the form of a Hadoop cluster.
  • You need to schedule the batch process to occur at some future time. But what happens if you want to query the data before that time?
  • What happens when something goes wrong with the batch process? Do you know how to resume from the failure point? Have you lost any data? What does it mean for your application if the previous batch didn’t succeed?

Is there a better way? Can you produce aggregated data in real-time, in a scalable way, without having to manage servers?

Using DynamoDB Streams

At Signiant we use AWS’s DynamoDB extensively for storing our data. We like it because it provides scalability and performance while being almost completely hands-off from an operational perspective.

DynamoDB Streams is a feature of DynamoDB that can send a series of database events to a downstream consumer. This consumer can be an application you write and manage yourself, or an AWS Lambda function you write and allow AWS to manage and trigger. Depending on the operation that was performed on your source table, your application will receive a corresponding INSERT, MODIFY, or REMOVE event. The event will also include a snapshot of the data contained in the database row before and after it was changed.

For example, if a new row gets written to your source table, the downstream application will receive an INSERT event that will look something like this:

{  
"Records":[
{
"eventID":"7de3041dd709b024af6f29e4fa13d34c",
"eventName":"INSERT",
"eventVersion":"1.1",
"eventSource":"aws:dynamodb",
"awsRegion":"us-east-1",
"dynamodb":{
"ApproximateCreationDateTime":1479499740,
"Keys":{
"Timestamp":{
"S":"2016-11-18:12:09:36"
},
"Username":{
"S":"John Doe"
}
},
"NewImage":{
"Timestamp":{
"S":"2016-11-18:12:09:36"
},
"Message":{
"S":"This is a bark from the Woofer social network"
},
"Username":{
"S":"John Doe"
}
},
"SequenceNumber":"13021600000000001596893679",
"SizeBytes":112,
"StreamViewType":"NEW_IMAGE"
},
"eventSourceARN":"arn:aws:dynamodb:us-east-1:123456789012:table/BarkTable/stream/2016-11-16T20:42:48.104"
}
]
}

Aggregating the data using AWS Lambda

What if we use the data coming from these streams to produce aggregated data on-the-fly and leverage the power of AWS Lambda to scale-up seamlessly?


Presume we are writing records to a source DynamoDB table of the following schema:

Source Table Schema

If we want to produce a daily sum of all bytes transferred by a customer on a given day, our daily rollup table schema might look something like:

Daily Aggregation Table Schema

Given these two schemas, we want our system to take a set of rows from the source table that looks like this:

Source Table Example

And produce entries in the aggregated table that looks like this:

Daily Aggregation Table Example

In the real world we write tens of thousands of rows into the source table per customer per day. It quickly becomes apparent that simply querying all the data from the source table and combining it on-demand is not going to be efficient. However querying a customer’s data from the daily aggregation table will be efficient for many years worth of data.

The Lambda Function (Node 8.10)

Note that the following assumes you have created the tables, enabled the DynamoDB stream with a Lambda trigger, and configured all the IAM policies correctly. You refer to this tutorial for a quick overview of how to do all this.

const AWS = require('aws-sdk');
const documentClient = new AWS.DynamoDB.DocumentClient({});
async function handler(event, context, callback) {
try {
//trim down to just "INSERT" events
const insertRecords = event.Records.filter(record => record.eventName === 'INSERT');
// Unnmarshall records them to plain JSON objects
const unmarshalledRecords = insertRecords.map(record =>
AWS.DynamoDB.Converter.unmarshall(record.dynamodb.NewImage)
);
for (let record of unmarshalledRecords) {
// this could be changed to any time window (a month, a year) and, in theory, the below algorithm would still work
const MS_IN_DAY = 1000 * 60 * 60 * 24;
const recordEndTime = new Date(record.TransferTime);
const aggregatedDay = recordEndTime.getTime() - recordEndTime.getTime() % MS_IN_DAY;
const params = {
TableName: 'DailyAggregateTable',
Key: {
CustomerId: record.CustomerId,
Day: new Date(aggregatedDay).toISOString(),
},
UpdateExpression: 'ADD #TotalBytes :Bytes, #CombinedRecords :CombinedRecords',
ExpressionAttributeNames: {
'#TotalBytes': 'TotalBytes',
'#CombinedRecords': 'CombinedRecords',
},
ExpressionAttributeValues: {
':Bytes': record.Bytes,
':CombinedRecords': 1
},
};
try {
//Write updates to daily rollup table
await documentClient.update(params).promise();
} catch (err) {
//Swallow any errors
console.error(
`Internal Error: Error updating dynamoDB record with keys [${JSON.stringify(
params.Key
)}] and Attributes [${JSON.stringify(params.ExpressionAttributeValues)}]`
);
console.error(err);
}
}
callback(null, `Successfully processed ${event.Records.length} records.`);
} catch (err) {
console.error(`Error processing records. Event was [${JSON.stringify(event)}`);
console.error(err);
//Note we don't actually fail the lambda function here by calling back with the error e.g. callback(err)
callback(null, `Swallowed the error ${JSON.stringify(err)}`);
}
}
exports.handler = handler;

A few snippets of interest

const insertRecords = event.Records.filter(record => record.eventName === 'INSERT');

Here we are filtering the records down to just INSERT events. In theory you can just as easily handle DELETE events by removing data from your aggregated table or MODIFY events by calculating the difference between the old and new records and updating the table


UpdateExpression: 'ADD #TotalBytes :Bytes, #CombinedRecords :CombinedRecords',
ExpressionAttributeNames: {
'#TotalBytes': 'TotalBytes',
'#CombinedRecords': 'CombinedRecords',
},
ExpressionAttributeValues: {
':Bytes': record.Bytes,
':CombinedRecords': 1
},

DynamoDB doesn’t support transactions or atomic operations, so how do you ensure that when two lambda functions write the same record at the same time they don’t both overwrite the same value instead of aggregating it? One answer is to use update expressions. Here we are using an update expression to atomically add to the pre-existing Bytes value. Two, near-simultaneous, updates will successfully update the aggregated value without having to know the previous value.

There are a few different ways to use update expressions. For example, if you wanted to add a createdOn date that was written on the first update, but then not subsequently updated, you could add something like this to your expression:

SET #createdOn = if_not_exists(#createdOn, :now)

try {
//Write updates to daily rollup table
await documentClient.update(params).promise();
} catch (err) {
//Swallow any errors
console.error(err);
}
...
//Note we don't actually fail the lambda function here by calling back with the error e.g. callback(err)
callback(null, `Swallowed the error ${JSON.stringify(err)}`);

Here we are swallowing any errors that occur in our function and not triggering the callback with an error. E.g. Not calling callback(err). This is because your Lambda will get triggered with a batch of events in a single invocation (this can be changed by setting the BatchSize property of the Lambda DynamoDB Stream event source), and you generally don’t want to fail the entire batch. This will be discussed more below.

Gotchas and Lessons Learned

Kinesis, Batch Size, Error Handling, and Partial Failure

Under the hood, DynamoDB uses Kinesis to stream the database events to your consumer. By its nature, Kinesis just stores a log of events and doesn’t track how its consumers are reading those events. It simply provides an interface to fetch a number of events from a given point in time. It’s up to the consumer to track which events it has received and processed, and then request the next batch of events from where it left off (luckily AWS hides this complexity from you when you choose to connect the event stream to a Lambda function). This is a different paradigm than SQS, for example, which ensures that only one consumer can process a given message, or set of messages, at a given time. In SQS you can then delete a single message from the queue so it does not get processed again. In Kinesis there is no concept of deleting an event from the log.

The inability to control the set of events that is coming from the stream introduces some challenges when dealing with errors in the Lambda function. There is no concept of a partial success. I.E. you can’t send information back to the stream saying: “I processed these 50 events successfully, and these 50 failed, so please retry the 50 that failed”. If you fail your entire Lambda function, the DynamoDB stream will resend the entire set of data again in the future. This is problematic if you have already written part of your data to the aggregate table. How do you prevent duplicate records from being written? And how do you handle incoming events that will never succeed, such as invalid data that causes your business logic to fail? There is no silver bullet solution for this case, but here are some ideas:

  • Do some data-sanitization of the source events. If you can identify problems and throw them away before you process the event, then you can avoid failures down-the-line. Log the failures and possibly set up some CloudWatch Alarms to notify you of these unexpected cases. We used CloudWatch Metric Filters to translate our log errors into metrics we could observe and trigger alarms for.
  • Perform retries and backoffs when you encounter network or throughput exceptions writing to the aggregate table. This provides you more opportunity to succeed when you are approaching your throughput limits. If you are using an AWS SDK you get this for free.
  • If all else fails, write the event you are currently processing to some secondary storage. You cannot throw away this data if you want your destination table to be an accurate aggregate of the source table. Writing the event to an SQS queue, or S3, or even another table, allows you to have a second chance to process the event at later time, ideally after you have adjusted your throughput, or during a period of lighter usage. We implemented an SQS queue for this purpose.
  • Set your BatchSize to 1. I wouldn’t generally recommend this, as the ability to process and aggregate a number of events at once is a huge performance benefit, but it would work to ensure you aren’t losing data on failure. Simply trigger the Lambda callback with an error, and the failed event will be sent again on the next invocation. With this approach you have to ensure that you can handle events quickly enough that you don’t fall too far behind in processing the stream. A DynamoDB stream will only persist events for 24 hours and then you will start to lose data. You can monitor the IteratorAge metrics of your Lambda function to determine how far behind you might be.

DynamoDB Throughput, Concurrency, Partitions, and Batch Writes

Although DynamoDB is mostly hands-off operationally, one thing you do have to manage is your read and write throughput limits. Setting these to the correct values is an inexact science. Set them too low and you start getting throughput exceptions when trying to read or write to the table. Set them too high and you will be paying for throughput you aren’t using. Auto-scaling can help, but won’t work well if you tend to read or write in bursts, and there’s still no guarantee you will never exceed your throughput limit.

In our scenario we specifically care about the write throughput on our aggregate table. We want to allow our Lambda function to successfully write to the aggregate rows without encountering a throughput exception. The logical answer would be to set the write throughput on the aggregate table to the same values as on the source table. After all, a single write to the source table should equate to a single update on the aggregate table, right? Unfortunately, the answer is a little more complicated than that.

First, you have to consider the number of Lambda functions which could be running in parallel. E.g. if you are running two Lambdas in parallel you will need double the throughput that you would need for running a single instance. The potential number of Lambdas that could be triggered in parallel for a given source table is actually based on the number of database partitions for that table. There is one stream per partition. Unfortunately there is no concrete way of knowing the exact number of partitions into which your table will be split. It is a factor of the total provisioned throughput on the table and the amount of data stored in the table that roughly works out to something like

Total partitions = 
MAX(Total partitions for desired performance,
Total partitions for desired capacity)

See this article for a deeper dive into DynamoDB partitions. You can get a rough idea of how many Lambda functions are running in parallel by looking at the number of separate CloudWatch logs your function is generating at any given time. There should be about one per partition assuming you are writing enough data to trigger the streams across all partitions.

You can also manually control the maximum concurrency of your Lambda function. For example, if you tend to write a lot of data in bursts, you could set the maximum concurrency to a lower value to ensure a more predictable write throughput on your aggregate table. Again, you have to be careful that you aren’t falling too far behind in processing the stream, otherwise you will start to lose data.

Secondly, if you are writing to the source table in batches using the batch write functionality, you have to consider how this will affect the number of updates to your aggregate table. For example, a batch write call can write up to 25 records at a time to the source table, which could conceivably consume just 1 unit of write throughput. This will translate into 25 separate INSERT events on your stream. Since updating an item with update expressions cannot be done in batches, you will need to have 25x the throughput on the destination table to handle this case.

There is opportunity for optimization, such as combining the batch of events in memory in the Lambda function, where possible, before writing to the aggregate table. In practice, we found that having the write throughput on the aggregate table set to twice that of the source comfortably ensures we will not exceed our limits, but I would encourage you to monitor your usage patterns to find the number that works for your case.

In conclusion

Using the power of DynamoDB Streams and Lambda functions provides an easy to implement and scalable solution for generating real-time data aggregations. As a bonus, there is little to no operational overhead. The pattern can easily be adapted to perform aggregations on different bucket sizes (monthly or yearly aggregations), or with different properties, or with your own conditional logic. You could even configure a separate stream on the aggregated daily table and chain together multiple event streams that start from a single source.

There are a few things to be careful about when using Lambda to consume the event stream, especially when handling errors. Understanding the underlying technology behind DynamoDB and Kinesis will help you to make the right decisions and ensure you have a fault-tolerant system that provides you with accurate results.

Happy streaming!