Data Processing

Batch Update DynamoDB with Provisioned Capacity

Save costs using Step Functions to stay under write limits

John Elliott
Geek Culture

--

DynamoDB can be a very cost effective NoSQL database platform on AWS if you can stick to using its provisioned capacity feature. If usage patterns are consistent, it can be an estimated seven times cheaper to use provisioned capacity rather than on demand capacity¹. Another bonus is that the DynamoDB free tier also only works for provisioned capacity. However, provisioning capacity presents one major challenge in managing your data: how do you perform a batch update of thousands or millions of DynamoDB items without exceeding the provisioned capacity?

One option to solve this problem is to use AWS Step Functions. This is described by AWS as a ‘low-code visual workflow service used to orchestrate AWS services’ and is ideal to process a series of source files from S3 and update DynamoDB in a reliable and controlled manner.

Step Functions allows you to process a large volume of source S3 data and throttle the rate of write to DynamoDB, which means that you can use provisioned capacity and save money.

Side note that using Lambda functions alone won’t work here as this needs to support a long-running execution, and Lambdas are limited to 15 minutes execution time.

Understanding Write Capacity Units

When using provisioned capacity, a set limit of Write Capacity Units (WCU) is defined on each DynamoDB table. Each WCU represents one write per second, for an item up to 1 KB in size. Writing larger items to DynamoDB will consume more than one WCU.

These limits on write throughput mean that batch updates need to be throttled to a lower rate of writes than the configured WCU allows. Exceeding the provisioned WCU on the table causes DynamoDB to force throttling the writes and risks job failure and data loss.

Rather than have DynamoDB force throttling when batch writing, it is much better to implement logic that throttles the rate of items writes in your code.

The AWS Step Functions state machine

The crux of the matter is: how do we slow down the rate of writes enough to avoid exceeding the provisioned DynamoDB write capacity? To solve this, I built a state machine in AWS Step Functions to orchestrate the logic for reading source files and writing to DynamoDB.

The state machine calls a Lambda function for each file, with a pause within the function after each CSV row processed in order to throttle the DynamoDB write rate

The state machine iterates around multiple CSV files in a source S3 location, then updates DynamoDB based on the values in each row in the CSV. The CSV files were the partitioned outputs produced by a Spark ETL job, which is why there are multiple files. The state machine calls a Lambda function for each file, with a pause within the function after each CSV row processed in order to throttle the DynamoDB write rate. The overall design of the state machine is shown below and its iterative pattern is clear. I used Workflow Studio as it provides a slightly easier UI for configuring a state machine.

AWS Step Functions state machine for batch updating DynamoDB
Step Functions state machine for batch updating provisioned capacity DynamoDB table (image by author)

ConfigureRun Pass state

This is a Pass state and its purpose is to set the initial variables on the state machine. An Amazon States Language snippet is shown below for this. The key-values under Result will be passed into the Lambda function in the next step in the state machine.

"ConfigureRun": {
"Type": "Pass",
"Result": {
"s3Bucket": "bucket-name-here",
"s3KeyBase": "source-key-location/",
"s3KeyBaseProcessed": "processed-output-file-location/",
"functionTimeLimitSeconds": "870"
},
"ResultPath": "$.Payload",
"Next": "CheckIfFilesRemain"
}

CheckIfFilesRemain Lambda function

This is a Lambda function that takes in the parameters defined in ConfigureRun or runs after UpdateValuesInDB and determines whether there are any objects in the defined S3 location. A response variable noFilesFound will be true if the S3 location is empty and false if there are objects. Note that the original provided parameters are also returned by the function so that they can be used easily on the next step of the state machine.

In essence, the purpose of this function is to determine whether the state machine should continue iterating and processing the next remaining file.

The Node.js code for the Lambda function is below.

const aws = require('aws-sdk');
const s3 = new aws.S3({ apiVersion: '2006-03-01' });
exports.handler = async function iterator (event, context, callback) {
// Get values defined in ConfigureRun pass state
let s3Bucket = event.Payload.s3Bucket
let s3KeyBase = event.Payload.s3KeyBase
let s3KeyBaseProcessed = event.Payload.s3KeyBaseProcessed
let functionTimeLimitSeconds = event.Payload.functionTimeLimitSeconds

let d = new Date();
let s3UriSuffix = 'year=' + d.getUTCFullYear().toString() + '/month=' + (d.getUTCMonth()+1).toString() + '/day=' + d.getUTCDate().toString() + '/'
let s3UriFull = 's3://' + s3Bucket + '/' + s3KeyBase + s3UriSuffix
const s3params = {
Bucket: s3Bucket,
Delimiter: '/',
Prefix: s3KeyBase + s3UriSuffix
};
const s3ListResponse = await getS3ListObjects(s3params)

if (s3ListResponse.Contents == undefined) {
callback(null, {
s3Bucket,
s3KeyBase,
s3KeyBaseProcessed,
s3UriFull,
noFilesFound: true,
fileCount: 0,
functionTimeLimitSeconds
})
} else {
if (s3ListResponse.Contents.length > 0) { // there are files
callback(null, {
s3Bucket,
s3KeyBase,
s3KeyBaseProcessed,
s3UriFull,
noFilesFound: false,
fileCount: s3ListResponse.Contents.length,
functionTimeLimitSeconds
})
} else { // there are no files - this may mean the key prefix is invalid i.e. there is no data for the data that this has run
console.warn('No data for S3 URL ' + s3UriFull)
callback(null, {
s3Bucket,
s3KeyBase,
s3KeyBaseProcessed,
s3UriFull,
noFilesFound: true,
fileCount: 0,
functionTimeLimitSeconds
})
}
}
}
async function getS3ListObjects(s3params) {
const resp = await s3.listObjects(s3params).promise();
return resp
}

IsProcessingDone Choice state

If noFilesFound value provided by the previous CheckIfFilesRemain state is false then this will invoke Rule #1, and transition to the UpdateValuesInDB state. In other words, if there are objects in the S3 location, we will move on to process the first object.

If noFilesFound is true, then the default rule will fire and execution of the state machine will end. The Amazon States Language snippet is shown below for this state.

"IsProcessingDone": {
"Type": "Choice",
"Choices": [
{
"Variable": "$.Payload.noFilesFound",
"BooleanEquals": false,
"Next": "UpdateValuesInDB"
}
],
"Default": "Pass"
}

UpdateValuesInDB Lambda function

This is the most complex and important part of the overall solution. This is the component that processes a single source CSV object from S3 and updates DynamoDB with the values contained within it. It also contains the ‘pause’ to slow down writes to DynamoDB. There are some key considerations to bear in mind:

  1. Due to the WCU limits provisioned on the destination DynamoDB table, the rate of calls to update and put on the DocumentClient API must be throttled. The rate must be lower than the rate supported by the provisioned capacity.
  2. The rate of writes should be calculated by estimating the size of the item that you are writing to determine the amount of WCU used per write. For example, if each item is an average of 1.5 KB, then 2 WCU will be used per write (due to rounding up). If the table has a provisioned capacity of 10 WCU then you will be able to write a maximum of 5 items per second (or a pause of 0.2 seconds after each write). I used the code await new Promise(r => setTimeout(r, pauseTimeMS)) to implement the pause in Node.js.
  3. The Lambda function will still be running despite the pauses, so you’ll be paying for that. But Lambdas are cheap, so this shouldn’t be a big deal.
  4. Lambda functions can only execute for a maximum of 15 minutes, therefore the function must write out any unprocessed lines from the CSV back to the source S3 location before the 15 minute limit. The functionTimeLimitSeconds value set on ConfigureRun must therefore be less than 900 seconds. Remember to configure the Lambda timout to 15 minutes.
  5. You will need to delete processed files from the original S3 location (or risk an infinite loop). You should set a timeout on the Step Functions state machine to protect against an infinite loop occurring.
  6. Again in this Lambda, I write out all of the variables passed into this function. AWS Step Functions contains complex logic for filtering and merging parameters. However, I found that this method of proxying values through the Lambda functions is much simpler than wrestling with Step Functions ResultSelector, ResultPath and OutputPath logic.

The Node.js code for the Lambda is below.

const aws = require('aws-sdk');
const s3 = new aws.S3({ apiVersion: '2006-03-01' });
const os = require("os");
const documentClient = new aws.DynamoDB.DocumentClient();
const tableName = 'DynamoDB-table-name-here';/**
* Summary: updates DynamoDB with a pause between rows processed
*
* 1. Gets a list of all objects in S3 's3KeyBase' location
* 2. If no objects, stop running
* 3. If there are objects, get the first object
* 4. Read the CSV text from the object and split into lines:
* 5. For each CSV line, split by comma character
* 6. Use CSV row to add a value to the table
* 7. If 'functionTimeLimitSeconds' lapses, stop processing
* 9. Unprocessed rows write to new file in original location
* 10. Processed rows write to 's3KeyBaseProcessed' location
* 11. Source object in S3 's3KeyBase' location will be deleted
*/
exports.handler = async (event, context) => {

let s3Bucket = event.Payload.s3Bucket;
let s3KeyBase = event.Payload.s3KeyBase;
let s3KeyBaseProcessed = event.Payload.s3KeyBaseProcessed;
let functionTimeLimitSeconds = event.Payload.functionTimeLimitSeconds;

let pauseTimeMS = 200 // use WCU to calculate pause time
let d = new Date();
let s3UriSuffix = 'year=' + d.getUTCFullYear().toString() + '/month=' + (d.getUTCMonth()+1).toString() + '/day=' + d.getUTCDate().toString() + '/';
let s3UriFull = 's3://' + s3Bucket + '/' + s3KeyBase + s3UriSuffix;
let s3UriFullProcessed = 's3://' + s3Bucket + '/' + s3KeyBaseProcessed + s3UriSuffix;

const s3params = {
Bucket: s3Bucket,
Delimiter: '/',
Prefix: s3KeyBase + s3UriSuffix
};

/*
* Get all objects in bucket with key prefix (i.e. in folder)
*/
const s3ListResponse = await getS3ListObjects(s3params)

if (s3ListResponse.Contents == undefined || s3ListResponse.Contents.length == 0) { // no files in location
console.log("No files found in location " + s3UriFull)
let response = {
statusCode: 200,
s3Bucket,
s3KeyBase,
s3KeyBaseProcessed,
s3UriFull,
s3UriFullProcessed,
noFilesFound: true,
fileCount: 0,
functionTimeLimitSeconds
};
return response;
}

/*
* Get the contents of the first file then process each line
*/

// Grab the first file
let fileKey = s3ListResponse.Contents[0].Key;
let fileKeyFilename = fileKey.substring(fileKey.lastIndexOf('/') + 1);

const s3paramsObject = {
Bucket: s3Bucket,
Key: fileKey,
};

// Read the CSV text from S3
let fileText = await getS3ObjectBodyText(s3paramsObject);

let timeExpired = false;
let processedLines = 0;
let unprocessedLines = 0;
// Split the CSV by lines
let textLineArray = fileText.split("\n");
let fileTextUnprocessed = appendTextLine("", textLineArray[0]);
let fileTextProcessed = appendTextLine("", textLineArray[0]);
// Loop around each line
for (let i=1; i<textLineArray.length; i++) {
let dateLatest = new Date();
const diffTime = Math.abs(dateLatest - d);
const diffSecs = Math.ceil(diffTime / 1000);
if (diffSecs < functionTimeLimitSeconds) {
let line = textLineArray[i];
let lineValues = line.split(","); // split into columns

let date = new Date(); // for object timestamp

/*
* Create the item for DynamoDB
*/
let partitionKey = lineValues[0]; // first CSV column

let dbItem = {}
dbItem.partitionKey = partitionKey;
// set dbItem values here

await singleInsertItem(dbItem);
// This is the pause to throttle writes
await new Promise(r => setTimeout(r, pauseTimeMS));

fileTextProcessed = appendTextLine(fileTextProcessed, textLineArray[i]);
processedLines++;
} else { // time has run out
fileTextUnprocessed = appendTextLine(fileTextUnprocessed, textLineArray[i]);
timeExpired = true;
unprocessedLines++;
}
}

/*
* Write fileTextProcessed to processed folder
*/
await writeTextToS3File(s3Bucket, s3KeyBaseProcessed + s3UriSuffix, context, fileTextProcessed, d, fileKeyFilename);

if (timeExpired) { // time limit expired
/*
* Write fileTextUnprocessed to processed folder
*/
console.log("Ran out of time. Creating file for unprocessed rows in original location in S3");
if (fileKeyFilename.includes("_unprocessed")) {
fileKeyFilename = fileKeyFilename.split('_')[0]; // remove suffix previously generated for the file, if applicable
}
await writeTextToS3File(s3Bucket, s3KeyBase + s3UriSuffix, context, fileTextUnprocessed, d, fileKeyFilename + "_unprocessed");
}

/*
* Delete original object from S3
*/
await deleteProcessedFile(s3Bucket, fileKey);
let response = {
statusCode: 200,
s3Bucket,
s3KeyBase,
s3KeyBaseProcessed,
s3UriFull,
s3UriFullProcessed,
noFilesFound: false,
fileCount: s3ListResponse.Contents.length,
processedLines,
unprocessedLines,
functionTimeLimitSeconds
};
return response;
};
/**
* Appends a new line to text
*/
function appendTextLine(text, newLine) {
if (newLine == null) return text;
if (newLine.length == 0) return text;
text = text + newLine + os.EOL;
return text;
}
/**
* Inserts a single item into DynamoDB
*/
async function singleInsertItem(dbItem) {
let dataItem = {};
dataItem.TableName = tableName;
dataItem.Item = dbItem;
try {
let dataResponse = documentClient.put(dataItem).promise();
return dataResponse;
} catch(err) {
return err;
}
}
async function getS3ListObjects(s3params) {
const resp = await s3.listObjects(s3params).promise();
return resp;
}
async function getS3ObjectBodyText(s3params) {
const { Body } = await s3.getObject(s3params).promise();
let s3ObjectString = Body.toString('utf-8');
return s3ObjectString;
}
async function deleteProcessedFile(s3Bucket, s3Key) {
let s3params = {
Bucket: s3Bucket,
Key: s3Key
};
const resp = await s3.deleteObject(s3params).promise();
return resp;
}
async function writeTextToS3File(s3Bucket, s3Key, context, textContent, partitionDate, filename) {
let d = partitionDate;
let fileTimestamp = new Date();
fileTimestamp = fileTimestamp.toISOString();

let key = s3Key + filename + '-' + fileTimestamp;
let buff = Buffer.from(textContent, 'utf-8');const s3params = {
Bucket: s3Bucket,
Key: key,
Body: buff,
//ContentEncoding: 'base64',
ContentType: 'text/csv',
};
const s3Resp = await s3.putObject(s3params).promise();

return s3Resp;
}

Results

By using AWS Step Functions, you can batch insert or update a massive volume of items in DynamoDB while staying under the WCU provisioned capacity limits.

I tested this approach using 32 CSV files written by a Spark ETL job in S3, with 700 lines per file — meaning a total of 22,400 items needed to be updated in DynamoDB. Each line required a put and an update and the table had three global secondary indexes (GSIs). Each GSI has its own WCU provisioned.

The combination of the put, update and three GSIs meant that effectively there were five DynamoDB writes needed per CSV row.

The table was configured with a WCU of 6. GSI1 and GSI2 had a WCU of 4. GSI3, which had its sort key value updated during the job, had a WCU of 7.

Provisioned vs consumed write capacity units for DynamoDB table (image by author)

Above is the Cloudwatch metrics graph for the table during the execution of the state machine. Note that I reduced the provisioned capacity just before starting the job. We can see from the increase in consumed capacity from 2am to 5am that the job took around 3 hours to update 22,400 items in DynamoDB. The consumed capacity ran close to the provisioned capacity, but generally executed smoothly. The graph on the right shows two occurrences of write requests being throttled by DynamoDB shortly after 2am, which indicates that I should either have increased the provisioned capacity by one or slightly increased the pause time.

Provisioned vs consumed write capacity units for DynamoDB GSI2 (image by author)

The Cloudwatch metrics for GSI2 is shown above. I increased the WCU on GSI2 to 4 shortly before running the job. DynamoDB didn’t throttle any writes to GSI2 so this looks to be well configured.

Provisioned vs consumed write capacity units for DynamoDB GSI3 (image by author)

With a WCU of 7, GSI3 has more write capacity than the table and the other GSIs. However, as the sort key value for this GSI is being updated during the job, more GSI capacity is consumed on each write. We can see here that the consumed capacity is very close to the provisioned capacity throughout the 3 hour job. Despite this, only one instance of DynamoDB throttling writes occurred during the job. It would, however, be safer to increase provisioned capacity further on GSI3 or slightly increase the pause time in the UpdateValuesInDB Lambda function.

By using AWS Step Functions, we have a robust, reliable and scalable serverless method of batch updating DynamoDB. Error handling is built into its workflow and includes features such as exponential back-off when retrying failed states.

We use three state transitions per source file in this job. At a price of $0.025 per 1,000 state transitions, the Step Functions costs here are negligible. Likewise, the Lambda costs are only 2–3 cents for 3 hours of a 128 MB function. Overall, this approach is very cost effective and provides a robust method of making bulk updates to provisioned DynamoDB tables.

Additional Reading / References

  1. DynamoDB on-demand vs. provisioned capacity: Which is better? While on-demand delivers the best fit for scalability, the cost is approximately seven times higher than provisioned capacity’ https://searchcloudcomputing.techtarget.com/answer/DynamoDB-on-demand-vs-provisioned-capacity-Which-is-better
  2. An explanation of Write Capacity Units for provisioned capacity on DynamoDB https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/ProvisionedThroughput.html
  3. DocumentClient API for writing to DynamoDB https://docs.aws.amazon.com/AWSJavaScriptSDK/latest/AWS/DynamoDB/DocumentClient.html

--

--

John Elliott
Geek Culture

Enterprise cloud, analytics and ML. Computer Science and MBA educated. Triple AWS certified. Scottish but living in Australia.