Building a DynamoDB Data Migration Pipeline

Alexander Smirnoff
5 min readJun 24, 2023

--

Introduction

Last week, we encountered a critical situation where we accidentally destroyed a production DynamoDB table. As we urgently needed to recover the lost data, we turned to AWS’s point-in-time recovery feature. However, we soon realized that the restored backups could only be applied to a new table, posing a challenge in migrating the restored data back to the production table. Determined to overcome this hurdle, we quickly devised a solution — a serverless data migration pipeline using AWS Lambda and DynamoDB. In just one hour, we developed and executed a script that successfully migrated the restored data to the production table, saving us from a potential disaster.

In this article, I will share our experience and guide you through the implementation of this serverless data migration pipeline. We will leverage the power of AWS Lambda and DynamoDB to seamlessly transfer data between tables, ensuring minimal downtime and preserving data integrity. This pipeline not only helped us in our critical situation but also provided us with a reliable and scalable solution for future data migrations.

Prerequisites

Before we dive into the implementation, make sure you have the following set up:

  • An AWS account with appropriate permissions to create Lambda functions and access DynamoDB.
  • AWS CDK installed locally.
  • Basic knowledge of AWS Lambda, DynamoDB, and JavaScript.

Implementation Overview

We’ll begin by outlining the high-level structure of our solution. We’ll use TypeScript for development and organize the code into two main components: the Lambda handler and the DynamoDBModule class. The Lambda handler will be responsible for handling the Lambda invocation and configuring the DynamoDBModule. The DynamoDBModule will encapsulate the logic for data migration and table cleanup, utilizing the AWS SDK for DynamoDB.

Lambda Handler

The Lambda handler serves as the entry point for our function. It initializes the necessary resources and orchestrates the data migration process. Let’s take a closer look at the code:

import { Logger } from "@aws-lambda-powertools/logger";
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { LambdaEvent } from "./dtos/lambdaEvent";
import { DynamoDBModule } from "./modules/dynamodb";

const region = process.env.AWS_REGION ?? process.env.AWS_DEFAULT_REGION ?? "eu-central-1";

const logger = new Logger({ serviceName: process.env.AWS_LAMBDA_FUNCTION_NAME });
const client = new DynamoDBClient({ region });
const ddbModule = new DynamoDBModule(client, logger);

export const handler = async (event: LambdaEvent) => {
try {
await ddbModule.cleanupDestinationTable(event);
await ddbModule.migrate(event);
} catch (error) {
logger.error("Error while migrating", { error });
}
};

In the Lambda handler, we create an instance of the DynamoDBModule class, passing the DynamoDB client and logger. We then invoke the cleanupDestinationTable and migrate methods of the DynamoDBModule class, handling any errors that may occur during execution.

DynamoDBModule Class

The DynamoDBModule class encapsulates the logic for data migration and table cleanup. It provides methods for interacting with DynamoDB, including scanning the source table, writing to the destination table, and handling retries. Here’s an overview of the class:

import { Logger } from "@aws-lambda-powertools/logger";
import {
AttributeValue,
BatchWriteItemCommand,
BatchWriteItemCommandOutput,
BatchWriteItemInput,
BatchWriteItemOutput,
DescribeTableCommand,
DescribeTableCommandInput,
DescribeTableCommandOutput,
DynamoDBClient,
ScanCommand,
ScanCommandInput,
ScanCommandOutput
} from "@aws-sdk/client-dynamodb";
import { Config } from "../dtos/config";
import { LambdaEvent } from "../dtos/lambdaEvent";

export class DynamoDBModule {
private readonly scanInput: Pick<ScanCommandInput, "Select" | "ReturnConsumedCapacity" | "Limit"> = {
Select: "ALL_ATTRIBUTES",
ReturnConsumedCapacity: "TOTAL",
};

constructor(
private readonly client: DynamoDBClient,
private readonly logger: Logger,
private readonly config: Config = { delayInMs: 100, retryLimit: 10, scanLimit: 1000 },
) {
this.scanInput.Limit = this.config.scanLimit;
}

public async cleanupDestinationTable(event: LambdaEvent): Promise<void> {
if (!event.cleanupDestinationTable) {
this.logger.debug("Skipping cleanup of destination table");
return;
}

const tableDescription: DescribeTableCommandOutput = await this.describeTable(event.destinationTableName);

if (!tableDescription.Table) {
throw new Error(`Table ${event.destinationTableName} not found`);
}

if (!tableDescription.Table.KeySchema) {
throw new Error(`Table ${event.destinationTableName} has no key schema`);
}

const keySchema: string[] = tableDescription.Table.KeySchema.map(attribute => attribute.AttributeName!);

const input: ScanCommandInput = {
...this.scanInput,
TableName: event.destinationTableName,
Select: "SPECIFIC_ATTRIBUTES",
AttributesToGet: keySchema,
};

await this.exec(input, event.destinationTableName);
}

public async migrate(event: LambdaEvent): Promise<void> {
if (!event.migrate) {
this.logger.debug("Skipping migration of data");
return;
}

const input: ScanCommandInput = { ...this.scanInput, TableName: event.sourceTableName, };

await this.exec(input, event.destinationTableName);
}

private async exec(input: ScanCommandInput, tableName: string) {
let nextToken: Record<string, AttributeValue> | undefined = { is: { S: "undefined" } };

while (nextToken) {
if (nextToken.is) { nextToken = undefined; }

input.ExclusiveStartKey = nextToken;

const scanResponse: ScanCommandOutput = await this.client.send(new ScanCommand(input));

nextToken = scanResponse.LastEvaluatedKey;

const chunks: Record<string, AttributeValue>[][] = this.chunk(scanResponse.Items ?? [], 25);

for (const chunk of chunks) {
const requestItems: BatchWriteItemInput["RequestItems"] = {
[tableName]: chunk.map(item => tableName !== input.TableName ? { PutRequest: { Item: item } } : { DeleteRequest: { Key: item } })
};
await this.batchWriteWithRetry({ RequestItems: requestItems });
};
}
}

protected async delay(ms: number, attempt: number): Promise<void> {
const exponentialDelay = ms * (2 * attempt);

const randomDelay = Math.floor(Math.random() * (exponentialDelay - ms)) + ms;

return new Promise(resolve => setTimeout(resolve, randomDelay));
}

private async describeTable(tableName: string): Promise<DescribeTableCommandOutput> {
const input: DescribeTableCommandInput = { TableName: tableName };

const command = new DescribeTableCommand(input);

return await this.client.send(command);
}

private async batchWriteWithRetry(batchWriteItemInput: BatchWriteItemInput, retryCount: number = 0,): Promise<BatchWriteItemOutput> {
if (retryCount > 0) {
await this.delay(this.config.delayInMs, retryCount);
}

const command: BatchWriteItemCommand = new BatchWriteItemCommand(batchWriteItemInput);

const batchResponse: BatchWriteItemCommandOutput = await this.client.send(command);

const unprocessedItems: string[] = Object.keys(batchResponse.UnprocessedItems ?? {});

if (unprocessedItems.length !== 0 && retryCount < this.config.retryLimit) {
this.logger.warn(`UnprocessedItems.length: ${unprocessedItems.length}.`);

retryCount++;

const response: BatchWriteItemOutput = await this.batchWriteWithRetry({ RequestItems: batchResponse.UnprocessedItems }, retryCount);

return response;
}

if (unprocessedItems.length !== 0 && retryCount === this.config.retryLimit) {
this.logger.error(`Retry limit reached. UnprocessedItems.length: ${unprocessedItems.length}.`);
}

return batchResponse;
}

private chunk<T>(arr: T[], size: number): T[][] {
return [...Array(Math.ceil(arr.length / size))].map((_, i) => arr.slice(size * i, size + size * i));
}
}

The DynamoDBModule class contains the necessary methods for table cleanup (cleanupDestinationTable) and data migration (migrate). It leverages the AWS SDK for DynamoDB to perform scanning, batch writing, and retries when necessary. Additionally, the class includes helper methods for chunking items and delaying retry attempts.

Conclusion

In this article, we explored how to build a serverless data migration pipeline using AWS Lambda and DynamoDB. We implemented a Lambda function that handles data migration from a source table to a destination table, providing options for table cleanup and retry mechanisms. By leveraging the power of serverless architecture and AWS services, we created a scalable and cost-effective solution for handling data migrations.

By customizing the code according to your specific use case, you can adapt this solution to fit your needs. You can further enhance the functionality by adding error handling, logging, and performance optimizations. With AWS Lambda and DynamoDB, you have the building blocks to create robust data migration pipelines that seamlessly scale to handle large volumes of data.

GitHub: https://github.com/dobeerman/aws-dynamodb-migration

Happy coding!

--

--