Sitemap
CODE + CONTOUR by IPSY

IPSY inspires everyone to express themselves, making beauty accessible for all. Through product innovation, machine learning technology, and a community-first mindset, we democratize beauty by delivering not only a personalized experience but a feeling of an authentic self.

Streamlining Real-Time Data Processing at IPSY

9 min readDec 2, 2024

--

Press enter or click to view image in full size

In today’s fast-paced digital environment, businesses face significant challenges in managing vast amounts of real-time data efficiently and cost-effectively. Traditional data management solutions often struggle to keep up with the velocity and variety of streaming data, leading to bottlenecks in data processing and analytics. Amazon DynamoDB is a key-value and document fully managed database that delivers single digit millisecond latency at any scale, most of the world’s fastest growing business depend on the scale and performance of DynamoDB to support their mission-critical workloads. However, there are several use cases that require advanced analytics and data warehousing capabilities that necessitate integration of data from databases like DynamoDB with a robust and scalable platform like Delta Lake.

At Ipsy, the data from web, mobile, and social media platforms is ingested by the CRUD services in near-real time into DynamoDB tables. Multiple groups across Ipsy wanted access to this data in near real time in an Amazon Simple Storage Service (Amazon S3) data lake, which would accelerate advanced data-driven decision-making.

We, the Ipsy Data Engineering team, chose Delta Lake as the preferred storage format for our data lake for reasons such as performance gains during change data capture (CDC) and access to design patterns like ZOrder that can significantly speed up query performance. Existing frameworks were performant enough for batch processing of CDC data into the data lake, but the team wanted a faster and cost-efficient alternative for ingesting streaming data from DynamoDB in Delta format. The solution we chose uses the deltalake Python package (based on the delta-rs Rust library) with AWS Lambda.

This post demonstrates how to implement the AWS SDK for pandas (formerly known as AWS Data Wrangler), which supports the deltalake PyPI package for reading and writing to Delta Lake tables, to write streaming data from DynamoDB in Delta format using Lambda compute. We will be using DynamoDB Streams and lambda to process the change data captures (CDC), since it offers performance and cost efficiency to this solution.

Overview of solution

The goal of this solution is to ingest CDC data from DynamoDB in Delta format in a fast and near real-time fashion into Amazon S3. The data could arrive in DynamoDB from various sources, which in this case is a front-end micro service that writes order data into the table using CRUD operations.

The following diagram illustrates the solution architecture.

Press enter or click to view image in full size

Following are a couple of additional architectural patterns that can also be a fit in place of the solution discussed in this post.

Determining the most appropriate solution depends on requirements such as data retention, cost etc., DynamoDB Streams retain data for 24 hours. In cases where data may be required to be retained for longer periods, it may be more suitable to send streaming data to Kinesis Data Streams which can retain data for up to 1 year. On the other hand, DynamoDB Streams ensure no duplicate records are sent for processing, whereas duplicates need to be explicitly handled when using Kinesis Data Streams.

For the architecture discussed in this solution, the only cost involved in writing streaming data from existing DynamoDB tables is the Lambda compute plus the cost for the LogStore table. Other solutions will accordingly incur cost for the additional resources.

In the following sections, we discuss the tasks associated with implementing the solution.

Enable DynamoDB Streams

In the Amazon DynamoDB console, select the source table from which we want to ingest the data. This table will need to have Streams enabled, so any updates will then be streamed to DynamoDB Streams for further processing.

To enable DynamoDB Streams on the source table, complete the following steps:

  1. On the DynamoDB console, choose Tables in the navigation pane and open the table configurations by clicking on the table name.
  2. On the Exports and streams tab, in the DynamoDB stream details section, choose Turn on.

Create a LogStore table

AWS S3 does not support concurrent write operations on the same key. This is because AWS encourages applications to implement object level locking to make the write operations strongly consistent. Due to this reason, delta-rs library implements object locking by leveraging a separate AWS DynamoDB table (referred to as LogStore table) which will maintain the commit metadata for concurrent writes, thereby facilitating transactional guarantees. For a more comprehensive explanation of the need for this implementation, please refer to this external blog. The LogStore table requires specific Partition Key and Sort Key as mentioned in the documentation here: Writing to S3 with a locking provider.

Create a table with the following code:

aws dynamodb create-table \
--region <region name> \
--table-name <table name> \
--attribute-definitions AttributeName=tablePath,AttributeType=S \
AttributeName=fileName,AttributeType=S \
--key-schema AttributeName=tablePath,KeyType=HASH \
AttributeName=fileName,KeyType=RANGE \
--provisioned-throughput ReadCapacityUnits=5,WriteCapacityUnits=5

Create an IAM role with required permissions

In this step, you create an AWS Identity and Access Management (IAM) role with the necessary permissions.

Use the following policy for DynamoDB Streams read access:

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DynamoDBReadAccess",
"Effect": "Allow",
"Action": [
"dynamodb:GetShardIterator",
"dynamodb:DescribeStream",
"dynamodb:GetRecords",
"dynamodb:ListStreams"
],
"Resource": <DynamoDB Stream ARN>
}
]
}

Use the following policy for DynamoDB LogStore write access:

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DynamoDBLockTableAccess",
"Effect": "Allow",
"Action": [
"dynamodb:BatchGet*",
"dynamodb:DescribeTable",
"dynamodb:Get*",
"dynamodb:Scan",
"dynamodb:BatchWrite*",
"dynamodb:PutItem"
],
"Resource": <ARN of the DynamoDB LogStore table>
}
]
}

Use the following policy for Delta Lake S3 bucket access:

{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "S3Access",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:GetObjectAttributes",
"s3:ListBucket",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::<bucketname>",
"arn:aws:s3:::arn:aws:s3:::<bucketname>/*"
]
}
]
}

Next step is to create the Lambda function which will host the data processing code. The mechanism to process streaming data using DynamoDB Stream API’s is well documented. However, there are certain functionalities that are required to be implemented regardless of the use case, before writing data to Delta Lake. Following code samples provide implementation details of these generic functionalities.

Parse DynamoDB marshalled format

DynamoDB uses marshalled format to store the items along with the attribute value’s data type. The Boto3 SDK has TypeDeserializer, which you can use to parse the marshalled format:

from boto3.dynamodb.types import TypeDeserializer
def deserializer(data: dict) -> dict:
dsz = TypeDeserializer()
dsz_dict = {}
for pk, pv in data.items():
if ( (pk.lower() in ('date_created', 'last_updated')) and "S" in pv ):
dsz_dict[pk] = datetime.fromisoformat(pv["S"])
else:
dsz_dict[pk] = dsz.deserialize(pv)
return dsz_dict

Convert complex types to strings

It’s better to ingest complex types such as Map and List as strings in Delta Lake to avoid code changes every time there are new elements added to these types:

def compressor(data: dict) -> dict:
mydict = {}
for k,v in data.items():
if (isinstance(v, list) or isinstance(v, dict)):
mydict[k] = json.dumps(v)
else:
mydict[k] = v
return mydict

Normalizing the schema for individual bulk write operations

DynamoDB is schemaless, so not all attributes are necessary to be present in every item. However, the target Delta table will always have a schema associated with it. When writing to the Delta table, each Lambda invocation will attempt to write a batch of items to the table. Since attributes of items within one Lambda invocation may differ from those in another Lambda invocation, one of the invocations will fail because of “schema mismatch” at the Delta table level. This can be resolved by using the schema_mode=merge feature supported by the deltalake package, which will merge the differences between the current write operation and the previously completed operations to the Delta log.

So the schema differences between different Lambda invocations can be gracefully handled by the merge schema mode.

In the following example, a batch of items sent to the Lambda function by the DynamoDB Stream trigger are appended into a list of dictionaries (each dictionary corresponds to one item) and passed to the “pour_into_lake” function which will create a Pandas dataframe from the list and derive the schema of the dataframe using pyarrow. With this approach, the schema of the data frame will always be consolidated with all the elements from each dictionary even if each dictionary may have a varying number of key/value pairs.

from deltalake.writer import write_deltalake
import os
import pandas as pd
import pyarrow as pa

s3_bucket = os.getenv('s3_bucket')
s3_prefix = os.getenv('s3_prefix')
rs_lock_table = os.getenv('rs_lock_table')
st_op = {'AWS_S3_LOCKING_PROVIDER': 'dynamodb', 'DELTA_DYNAMO_TABLE_NAME': f'{rs_lock_table}'}

def get_pandas_df(v_data: list) -> pd.DataFrame:
return pd.DataFrame(v_data)

def check_arrow_type(field):
return type(field.type) == pa.TimestampType

def get_arrow_schema(df: pd.DataFrame, ddb_table: str) -> pa.Schema:
schema_in = pa.Schema.from_pandas(df)
for field in schema_in:
if check_arrow_type(field):
schema_in = schema_in.set(schema_in.get_field_index(field.name), pa.field(field.name, pa.timestamp(unit='us', tz=field.type.tz)))
return schema_in

def pour_into_lake(v_ddb_table: str, v_data: list) -> dict:
df = get_pandas_df(v_data)
arrow_schema = get_arrow_schema(df, v_ddb_table)
write_deltalake(f"{s3_bucket}/{s3_prefix}/{v_ddb_table}", df, storage_options=st_op, mode='append', schema=arrow_schema, schema_mode='merge', engine='rust', partition_by=['part_col1','part_col2'])

Idempotency

When processing a batch of items from DynamoDB streams, it’s imperative to explicitly handle partial batch failures to avoid duplicate processing or idempotency issues. Lambda supports including the enum value ReportBatchItemFailures in the FunctionResponseTypes list. This makes sure Lambda excludes the items that have already been successfully processed from being sent to the function again.

Event source configuration

Lambda supports several options for DynamoDB event sources. For more details, see Using AWS Lambda with Amazon DynamoDB.

The following are some of the options that you need to adjust as necessary:

  • Starting position: Trim_HORIZON
  • Batch size: 300
  • Batch window: 300
  • Retry attempts: -1
  • Parallelization Factor: 1
  • Maximum age of record: -1
  • Function response types: [“ReportBatchItemFailures”]

Create the Lambda function

Here is the consolidated code:

from boto3.dynamodb.types import TypeDeserializer
from deltalake.writer import write_deltalake
from datetime import date, datetime, timezone
import json
import os
import pandas as pd
import pyarrow as pa

s3_bucket = os.getenv('s3_bucket')
s3_prefix = os.getenv('s3_prefix')
st_op = {'AWS_S3_LOCKING_PROVIDER': 'dynamodb', 'DELTA_DYNAMO_TABLE_NAME': "logstore_table_name"}

def deserializer(data: dict) -> dict:
dsz = TypeDeserializer()
dsz_dict = {}
for pk, pv in data.items():
if ( (pk.lower() in ('date_created', 'last_updated')) and "S" in pv ):
dsz_dict[pk] = datetime.fromisoformat(pv["S"])
else:
dsz_dict[pk] = dsz.deserialize(pv)
return dsz_dict

def compressor(data: dict) -> dict:
mydict = {}
for k,v in data.items():
if (isinstance(v, list) or isinstance(v, dict)):
mydict[k] = json.dumps(v)
else:
mydict[k] = v
return mydict

def check_arrow_type(field):
return type(field.type) == pa.TimestampType

def get_pandas_df(v_data: list) -> pd.DataFrame:
return pd.DataFrame(v_data)

def get_arrow_schema(df: pd.DataFrame, ddb_table: str) -> pa.Schema:
schema_in = pa.Schema.from_pandas(df)
for field in schema_in:
if check_arrow_type(field):
schema_in = schema_in.set(schema_in.get_field_index(field.name), pa.field(field.name, pa.timestamp(unit='us', tz=field.type.tz)))
return schema_in

def pour_into_lake(v_ddb_table: str, v_data: list) -> dict:
df = get_pandas_df(v_data)
arrow_schema = get_arrow_schema(df, v_ddb_table)
write_deltalake(f"{s3_bucket}/{s3_prefix}/{v_ddb_table}", df, storage_options=st_op, mode='append', schema=arrow_schema, schema_mode='merge', engine='rust', partition_by=['part_col1','part_col2'])

def main(event, context):
records = event.get("Records")
records_list = []
ddb_table_name = records[0]["eventSourceARN"].split("/")[1]
for record in records:
resp_dsz = deserializer(record["dynamodb"]["NewImage"])
resp_cmp = compressor(resp_dsz)
records_list.append(resp_cmp)
ret_resp = pour_into_lake(ddb_table_name, records_list)
return ret_resp

def lambda_handler(event, context):
ret_main = main(event, context)
return ret_main

Create the function using AWS CLI. The values mentioned in the following command such as handler name and memory are for reference, but they can be customized as required.

aws lambda create-function \
--region <region name> \
--function-name <value> \
--zip-file fileb://<value>.zip
--handler lambda_handler
--memory-size 384

Output

Here is a side-by-side comparison of the metadata from source and target data, post implementation.

Source: DynamoDB

Target: Delta table

The complex type (List) from the DynamoDB has been ingested and transformed into String data type in the target, while the other basic data types are ingested as their corresponding data types in the target.

Few things to consider

As mentioned before, the inherent limitation of S3 prevents writing to the same key concurrently, so concurrent write attempts by parallel lambda invocations could add latency to the write operations. This is mitigated by using a lock table using DynamoDB. However, an increase in the number of concurrent writes could result in the write operations getting stuck attempting to acquire locks on the table. In cases where the overall latency does not meet the project requirements, the batch window / batch size parameters for Lambda trigger could be tweaked to make the write operation faster and hence process the batches quicker. A large volume of streaming data (such as thousands of records every second) could however add significant latency. In such cases, it would be better to add another intermediary process that batches all the concurrent Lambda requests and executes the write operations in sequence. This would help estimate actual latency for the write operations to complete and make an informed decision.

Conclusion

In this post, we demonstrated how Ipsy solved the problem of ingesting streaming data from DynamoDB in Delta format quickly and in near real time into Amazon S3 by using DynamoDB Streams and Lambda. The solution not only makes the process more efficient but is also cost effective. Using Lambda for DynamoDB Streams doesn’t incur cost for reading from the stream, which adds to the overall cost-effectiveness of the solution.

Try out the solution for your own use case, and let us know your feedback and questions in the comments.

--

--

CODE + CONTOUR by IPSY
CODE + CONTOUR by IPSY

Published in CODE + CONTOUR by IPSY

IPSY inspires everyone to express themselves, making beauty accessible for all. Through product innovation, machine learning technology, and a community-first mindset, we democratize beauty by delivering not only a personalized experience but a feeling of an authentic self.

No responses yet