Streaming data from DynamoDB to your Data Lake (Part- I)

Abhisek Roy
Credit Saison (India)
19 min readApr 21, 2022
Streaming data from DynamoDB to your Data Lake and querying it through Athena in real-time

DynamoDB is a popular managed NoSQL database offered by AWS that is used by many across the globe. Since you have reached this post, you are probably aware of it and might be using it in your cloud infrastructure. However, it’s probably not common knowledge that you can capture the changes in a DynamoDB table (known as CDC or Change Data Capture) and stream them to an S3 bucket .

The reason behind this requirement can be multiple-

  • You might need a second copy of the data for your data-crunching team to work on.
  • The changes in the data might be required for deeper analysis.

Something to note here, in case you are unfamiliar with CDC is that when you are performing a CDC, you get a steady stream of every change in a particular row in a database and not just its final form.

So let’s say you have the details for user “john_doe” in your database. Your database would be having just one row with the latest information of user “john_doe”, whereas your S3 bucket (where you are saving the CDC), will be having n+1 number of rows for the same user, where n stands for the number of times changes were made to the row. Why n+1? Well, the 1 stands for the first time the row was inserted into the table.

The AWS resources that we used:

I wanted to share the resources that we used for this particular project beforehand so that you could get familiar with them, or check the ones that you might not have used before.

  • DynamoDB (AWS::DynamoDB::Table) — This one is a no-brainer. You wouldn’t be doing this unless you had a DynamoDB table, to begin with.
  • Kineses Stream (AWS::Kinesis::Stream) — The real-time streaming service that is used to capture the data changes from DynamoDB. While you get two options for streaming the data, we chose this one specifically, due to our use case (more on that later).
You get two options for streaming from a DynamoDB
  • Kinesis Firehose (AWS::KinesisFirehose::DeliveryStream) This is the ETL service that comes in right after the Kinesis Stream. Data flows from Kinesis Stream to Kinesis Firehose, after which it can be formatted or transformed (even using a lambda function if you want), and then stored in an S3 bucket (in the format of your choice).
  • Lambda (AWS::Serverless::Function)— We use an AWS Lambda in the Kinesis Firehose to make certain changes in the data that is flowing in from the DynamoDB table. This may not always be necessary, but is important in case you want to capture data such as the timestamp, or change the formatting of the data. I will be sharing some of the code that we used for our formatting (all the code shared in this document is in Python).
  • Glue Table (AWS::Glue::Table) The Glue Table where we define the format of the data that will be pushed to the S3 bucket. This is not always required but in case you are saving the data in Parquet format (which is highly recommended to save space as well as query time by as much as 99%), this is a must.
  • Glue Database (AWS::Glue::Database) Each glue table needs to be tagged to a Glue Database, which is why you will need to create one (or use an existing one).
  • S3 Bucket (AWS::S3::Bucket) — The bucket where you will be dumping the data (basically your Data Lake).
  • Athena (For querying the data) While all the other resources on this list will need to be created by you (unless they are already present), Athena is more like an interface that provides you with a terminal on AWS to run queries on the data that you store in your Data Lake.

One thing some of you might be wondering is the reason behind those cryptic words within brackets beside each resource. Those are the Cloudformation Types for each resource. AWS Cloudformation is an IAAS (Infrastructure as a Code) solution that allows you to type out your infra requirements in a certain format, in JSON or YAML files, and run them across environments.

Why do we need to back up our data from DynamoDB to the Data Lake?

So, let's get the elephant out of the room first. In case you want to run queries on transactional tables, generate reports, or deep dive into user behaviour, it might not be a great idea. The reason behind this is that data analysis queries may query data within a large period such as a month or a year or longer.

These queries would have a large impact on the normal running of your databases and in turn, the users interacting with your website or app would be affected due to delayed response or even possible timeout errors.

Another reason is that your transactional tables will just contain the latest state of every id, whereas your Data Lake will contain the changes. In many instances, the changes themselves are more important and might require a deep dive by your data team or the business team to enable data-driven decision-making.

Why did we use the Amazon Kinesis Data Stream?

Kinesis Stream, combined with Firehose gives you a managed solution using which you can set up your data streaming infrastructure in a matter of minutes. Things like partitioning or output data format can be changed by simple config changes in the resources. When you push the data to the S3 bucket, a backup folder is automatically created where all the data that you push to the Data Lake is saved in its raw format. This can be used in case your data flow breaks due to any reason or if there are errors in the processing stage of Firehose.

A separate folder called errors is also created where errors are saved. Errors can be of 2 kinds- 1. Data processing error or 2. Format conversion error. These inbuilt features and the benefits of being a managed service are a major reason why we used Kinesis Data Stream.

In case we had opted for DynamoDB Streams, we would have needed to build out a lot of the features that we spoke of- features, that you will need to build, manage and update from time to time. The only benefit in case you use DynamoDB Streams is that it is guaranteed that no duplicate records will be sent, whereas there is a minimal chance of duplicate records in the case of Kinesis Data Streams, but that can be handled easily.

The DynamoDB Table

While I am expecting that most of you have already set up the Dynamo table and have come to this article to understand how to stream the data from the DynamoDB in real-time to a Data Lake, there’s also a possibility that few may want to try this out as an experiment before creating the entire setup in their production environment. For those, I will be sharing the config and Cloudformation template of the DynamoDB table as well.

---
AWSTemplateFormatVersion: 2010-09-09

Parameters:
TableName:
Type: String
Description: Table Name


Resources:

Table:
Type: "AWS::DynamoDB::Table"
Properties:
KinesisStreamSpecification:
StreamArn: !GetAtt KinesisStream.Arn
PointInTimeRecoverySpecification:
PointInTimeRecoveryEnabled: true
AttributeDefinitions:
- AttributeName: "partition_key"
AttributeType: "S"
KeySchema:
- AttributeName: "partition_key"
KeyType: "HASH"
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
TableName: !Ref TableName

While the rest of the template deals with configs related to the DynamoDB itself, we are mainly concerned with the KinesisStreamSpecification and PointInTimeRecoverySpecification.

  1. KinesisStreamSpecification- Deals with the streaming of the CDC for the DynamoDB table via Kinesis. We have added the KinesisStream ARN in the StreamArn parameter. The details for the Kinesis Stream are shared in the next topic in this article. Once you provide a particular Kinesis Stream in this config parameter, the CDC from the DynamoDB table will be streamed over that stream only, and it will be left up to you how you manage the data further downstream- you can connect the Kinesis Stream to a firehose, add a lambda, etc
  2. PointInTimeRecoverySpecification- While this has got nothing to do with the streaming of data, marking the PointInTimeRecoveryEnabled parameter as true will allow you to take a backup of the DynamoDB table. This may be needed in case of-
    * a failure in streaming where you may need to capture the data from the DynamoDB at that particular instant in time.
    * In case your DynamoDB was created first and you started streaming the CDC after some time, you might want to take a backup of the data at the same time as when you begin streaming. This backup may be used to fill up the Data Lake with the data which was present before streaming started.

Kinesis Stream and Firehose

Since we chose the option of Kinesis Stream, for the DynamoDB CDC, we will be creating a Kinesis Stream and a Kinesis Firehose. We use Cloudformation Templates for creating and updating all our AWS resources, and I will be sharing the same here, and will also be going through the various parameters that I passed. In case you are doing the same via the AWS user interface, you would be able to choose the various options in the UI.

The template shared below is for the Kinesis Stream. It is the simplest template in this project. The only value we mentioned is the number of shards in ShardCount. You could also use the StreamEncryption parameter to enable service-side encryption using an AWS KMS key. Another parameter is the StreamModeDetails which can be set to on-demand or provisioned. Since we have already set the number of shards in our case, the Kinesis Stream will automatically be set as provisioned, even though we haven’t added this parameter. You can set it to on-demand in case you want AWS to automatically scale the number of shards based on the requirement.

---
AWSTemplateFormatVersion: 2010-09-09
Parameters:
ShardCount:
Type: Number
Description: Number of shards allotted to the Kinesis Stream- you can use more for higher throughput
Resources:
KinesisStream:
Type: AWS::Kinesis::Stream
Properties:
ShardCount: !Ref ShardCount
Outputs:
KinesisStream:
Value: !Ref KinesisStream

You can see that we have exported the value of the Kinesis Stream resource since this was fed to the DynamoDB to enable it to stream the data changes using this particular resource (as shown in the last heading).

What we have next, is the Kinesis Firehose which is basically what takes the data from the Kinesis Stream, processes it, and puts it in your Data Lake (S3 bucket). There are several important parameters in this case, which we will go over one by one.

  1. DeliveryStreamType- This can be a “DirectPut” in case the source of the Firehose adds data directly, or a “KinesisStream”. We have used the second one in our case, due to obvious reasons.
  2. KinesisStreamSourceConfiguration- Since we have set the Delivery Stream to KinesisStream, we will need to provide the parameters required for it.
    * KinesisStreamARN- We need to pass the ARN for the Kinesis Stream that we created earlier
    * KinesisStreamSourceConfiguration- The ARN of the role that provides access to the source Kinesis data stream.
  3. ExtendedS3DestinationConfiguration- Since we are using S3 as the destination for our Kinesis Firehouse, we will need to add the configuration for the same. Using ExtendedS3DestinationConfiguration is a better option as compared to S3DestinationConfiguration since it provides you with more options.
    * BucketARN- ARN of the bucket where you want to dump your data.
    * BufferingHints- This consists of the sub-parameters IntervalInSeconds and SizeInMBs. These values are used as hints, and the Firehose may use different values in case they are more optimal. Also, ensure you give values for either both or none. Giving one and ignoring the other will result in errors in the template.
    * CloudWatchLoggingOptions- This is extremely important since things will fail, code will break, and you will eventually need to debug your cloud resources. You will need to set Enabled to true and mention the values for LogGroupName and LogStreamName. Examples are provided in the template below.
    * CompressionFormat- Options can be UNCOMPRESSED, GZIP, ZIP, Snappy, or HADOOP_SNAPPY. We have used UNCOMPRESSED in our template.
    * ErrorOutputPrefix- This is the folder structure that is used to save errors that occur while data is being streamed and processed. You can see that there are some placeholders. The firehose:error-output-type is used to denote the different error types (a folder is created for each of them). These are usually processing or format conversion errors. We have used partitioning based on year, month, and date after the error type.
    * Prefix- This is the folder structure inside the S3 that you chose, where the data that are streamed will be stored in file formats (like Parquet). We used a similar partitioning format here as well.
    * RoleARN- This vital role is required so that you can give the KinesisDeliveryStream the access to write to the S3 bucket. You may also need encryption-decryption access using the KMS key, based on your encryption settings.
    * ProcessingConfiguration- This is the configuration that we create for processing the data that flows from the Kinesis Stream to Kinesis Firehose. In our case, we have used a Lambda function as a processor. The Lambda code is shown and explained later in this article. You need to provide the values of some specific parameters such as LambdaArn, NumberOfRetries, BufferSizeInMBs, and BufferIntervalInSeconds, all of which are self-explanatory. Something to remember here is that Kinesis Data Firehose supports a Lambda invocation time of up to 5 minutes.
    * DataFormatConversionConfiguration- This is used mainly to specify if we want to convert the data from Kinesis which is in JSON format to Parquet or ORC format. In our case, we are converting the data to Parquet format.
    The first data point you need to fill in is the SchemaConfiguration in which you will need to point to the Glue Table and Database being used. You will also need to add some more parameters corresponding to the Glue Table and Database.
    Next, we have the InputFormatConfiguration in which you will need to mention the Deserializer. We have used OpenX JSON SerDe since it provides us with a few parameters to take care of inconsistencies in data. These include ignore.malformed.json, dots.in.keys, and case.insensitive. You can read more about these here.
    Finally, we have the OutputFormatConfiguration where we have used the ParquetSerDe for converting our files to Parquet format in the Data Lake S3 bucket. The compression chosen here is GZIP to keep it at par with other data streams that we have used in our Data Lake (we have a DMS stream that streams data from Postgres tables as well).
    * S3BackupMode- This needs to be set to enabled if you want to have a copy of your untransformed JSON stream from the DynamoDB in case you want to handle errors or breakdowns. Having this on is highly recommended personally.
    * S3BackupConfiguration- This is where you need to mention the folders that will be used for the backup in case you turned on S3BackupMode. You have to mention folders for the errors as well as the data. The format is similar to what we used in the Prefix and ErrorOutputPrefix earlier. The bucket ARN as well as the IAM role that will enable the Kinesis Firehose to write to the bucket also need to be given. Something to note here is that if you are using the same bucket to dump the data and keep the backup, you can use the same role as what you used in ExtendedS3DestinationConfiguration.RoleARN.
---
AWSTemplateFormatVersion: 2010-09-09
Parameters:
DataLakeBucket:
Type: String
Description: Data Lake Bucket
ProcessDynamoEntriesLambda:
Type: String
Description: Process Dynamo Entries Lambda ARN
KinesisStream:
Type: String
Description: Kinesis Stream
GlueDataBase:
Type: String
Description: Glue Database
GlueTable:
Type: String
Description: Glue Table
Resources:
FirehoseDeliveryStream:
Type: AWS::KinesisFirehose::DeliveryStream
Properties:
DeliveryStreamType: KinesisStreamAsSource
KinesisStreamSourceConfiguration:
KinesisStreamARN: !Sub "arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${KinesisStream}"
RoleARN: !GetAtt FirehoseDeliveryStreamRole.Arn
ExtendedS3DestinationConfiguration:
BucketARN: !Sub "arn:aws:s3:::${DataLakeBucket}"
BufferingHints:
IntervalInSeconds: 60
SizeInMBs: 64
CloudWatchLoggingOptions:
Enabled: true
LogGroupName: /aws/kinesisfirehose/table_name
LogStreamName: KinesisFireHoseTableName
CompressionFormat: UNCOMPRESSED
ErrorOutputPrefix: dynamo-cdc/database/error/table_name/!{firehose:error-output-type}/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/
Prefix: dynamo-cdc/database/table_name/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/
RoleARN: !GetAtt FirehoseDeliveryStreamRole.Arn
ProcessingConfiguration:
Enabled: true
Processors:
- Type: Lambda
Parameters:
- ParameterName: LambdaArn
ParameterValue: !Ref ProcessDynamoEntriesLambda
- ParameterName: NumberOfRetries
ParameterValue: 2
- ParameterName: BufferSizeInMBs
ParameterValue: 3
- ParameterName: BufferIntervalInSeconds
ParameterValue: 60
DataFormatConversionConfiguration:
Enabled: True
SchemaConfiguration:
CatalogId: !Ref AWS::AccountId
VersionId: LATEST
RoleARN: !GetAtt DeliveryRole.Arn
DatabaseName: !Ref GlueDataBase
TableName: !Ref GlueTable
Region: !Ref AWS::Region
InputFormatConfiguration:
Deserializer:
OpenXJsonSerDe:
CaseInsensitive: true
ConvertDotsInJsonKeysToUnderscores: false
OutputFormatConfiguration:
Serializer:
ParquetSerDe:
Compression: GZIP
S3BackupMode: Enabled
S3BackupConfiguration:
BucketARN: !Sub "arn:aws:s3:::${DataLakeBucket}"
ErrorOutputPrefix: dynamo-backup/database/table_name/!{firehose:error-output-type}/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/
Prefix: dynamo-backup/database/table_name/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/
RoleARN: !GetAtt FirehoseDeliveryStreamRole.Arn

A lot of the parameters mentioned above do have default values which are used in case you do not mention them. However, it is recommended that you type out each of them even if you use the default values so that you can come back and check the settings later on if required or if you need to change anything.

Glue Database and Table

We showed you that a Glue Database and a Table inside that Database will have to be fed to the Firehose template to enable conversion of the JSON data to Parquet before it is stored in the S3 bucket. These are the templates for the Database and the corresponding table.

---
AWSTemplateFormatVersion: 2010-09-09
Parameters:
DatabaseName:
Type: String
Description: Name of the Glue Database
Resources:

GlueDataBase:
Type: AWS::Glue::Database
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseInput:
Description: "Glue database for dataLake"
Name: !Ref DatabaseName

The template for the Glue DataBase is pretty simple. All you need to provide is the AccountId to set the value of CatalogId and the Name. We have also provided a small description. While we are showing you how to create a new database, in case you already have a database that you are using for cataloguing tables that have been streamed from other transactional databases, it is recommended that you use the same database.

Next in line, is the Glue Table, which is referred to by the Kinesis Firehose when saving the data in the S3 bucket.

  1. You will need to set the CatalogId as the AccountId here as well, and also mention the DatabaseName. Here, we have mentioned the DataBase that we just created. You can also import and pass a pre-existing database.
  2. Next, we come to TableInput where we need to mention the various metadata related to the table.
    * Name is the first data point that needs to be passed. This is the table name that will be referred to when you query your data through Athena. Remember not to use “-” and only “_” in the names of both databases and tables. While “-” can be used, you will face problems later on when running queries via Athena. We learned things the hard way so that you don’t need to.
    * StorageDescriptor contains a set of parameters essential to a Glue Table such as the Location of the data in the table (The S3 bucket and folder structure where the data corresponding to this catalogue table resides). You also need to mention the columns and the data types of each column. In case you want to learn more about the data types, here is an AWS article to help you out. Next, you need to set the input and output format for the table. We have set both based on the fact that we have the table data saved in Parquet format in an S3 bucket. Compressed is set as false since the data in the bucket is not compressed in our case.
    * SerdeInfo is another important parameter under StorageDescriptor. It is used to define the serialization/deserialization library that will be used during format conversion. We have used the ParquetHiveSerDe. We have also set serialization.format, which is a parameter under SerdeInfo as false. This is the default value that is used widely. It is used to denote the field delimiter character in a file between two column fields of the table when the file is serialized. We have left the BucketColumns and SortColumn as empty arrays since we did not need them in our use case. We however set StoredAsSubDirectories as false since our data is not saved in that manner. We have PartitionKeys, where we need to mention the partition keys and their data type. We have used day, month, and year for partitioning and have passed the same here. At the very end, we have TableType, which can be any of EXTERNAL_TABLE, VIRTUAL_VIEW, or more. Whenever you add data to an S3 bucket and need Glue to Catalogue the data in a Glue table, the TableType will be “EXTERNAL_TABLE.
---
AWSTemplateFormatVersion: 2010-09-09
Parameters:
DataLakeBucket:
Type: String
Description: Data Lake Bucket
GlueDataBase:
Type: String
Description: Glue Data Base
Resources:
GlueTable:
Type: AWS::Glue::Table
Properties:
CatalogId: !Ref AWS::AccountId
DatabaseName: !Ref GlueDataBase
TableInput:
Name: database_table_name
StorageDescriptor:
Location: !Sub "s3://${DataLakeBucket}/dynamo-cdc/database/table_name/"
Columns:
- Name: uuid
Type: string
- Name: op
Type: string
- Name: time_stamp
Type: timestamp
- Name: data
Type: string
InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat
Compressed: false
SerdeInfo:
SerializationLibrary: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe
Parameters:
serialization.format: "1"
BucketColumns: []
SortColumns: []
StoredAsSubDirectories: false
PartitionKeys:
- Name: year
Type: string
- Name: month
Type: string
- Name: day
Type: string
TableType: EXTERNAL_TABLE
Outputs:
GlueDataBase:
Value: !Ref GlueDataBase

Preprocessing of the data using a Lambda function

In the ProcessingConfiguration of the Firehose template, in the field Type of Processor, we provided the value “Lambda”, and in parameters, right after that, we passed the LambdaArn as well.

The Lambda code that we are showing below is the lambda that is exported and then used in the Firehose Template. I’ll be explaining this code with an example so that it is easier to understand. Let’s say, I had a row on my table-

{
"name": "Abhisek Roy",
"city": "Bengaluru"
}

where name is the partition key. Now, due to the Covid situation, I returned to my hometown, in Kolkata, which is why the data got updated to-

{
"name": "Abhisek Roy",
"city": "Kolkata"
}

Let’s see what is the data that will enter this lambda, and what will be its output, when this change occurs in the database for a single row. In the first log statement in the lambda handler, this is what the event will look like-

{
'invocationId': 'ae298f43-e471-4098-b665-0c13af6ccc2a',
'sourceKinesisStreamArn': 'arn:aws:kinesis:ap-south-1:42339:stream/repo-name-kinesisstream-1UF-KinesisStream-uhcEC2',
'deliveryStreamArn': 'arn:aws:firehose:ap-south-1:42339:deliverystream/repo-name-firehosedeliverystream-LYUHPS-FGai-NkoasdJ',
'region': 'ap-south-1',
'records': [
{
'recordId': '496200',
'approximateArrivalTimestamp': 1649719712500,
'data': 'eyJhd3NSZWdpb24iOiAiYXAtc291dGgtMSIsICJldmVudElEIjogIjdkNWFzZDEtZGFzZGFzZDVlLTRkYXNkYXNkZDQtYTFhc2QyZi05YXNkMTk4MDIiLCAiZXZlbnROYW1lIjogIk1PRElGWSIsICJ1c2VySWRlbnRpdHkiOiBudWxsLCAicmVjb3JkRm9ybWF0IjogImFwcGxpY2F0aW9uL2pzb24iLCAidGFibGVOYW1lIjogInRhYmxlX25hbWUiLCAiZHluYW1vZGIiOiB7IkFwcHJveGltYXRlQ3JlYXRpb25EYXRlVGltZSI6IDE2NDk3MTk3MTI0NzMsICJLZXlzIjogeyJuYW1lIjogeyJTIjogIjk4ZGZlMy0xYzgtNGRiMy1iZjJlLTMyYTc5YWMifX0sICJOZXdJbWFnZSI6IHsibmFtZSI6IHsiUyI6ICJBYmhpc2VrIFJveSJ9LCAiY2l0eSI6IHsiUyI6ICJLb2xrYXRhIn19LCAiT2xkSW1hZ2UiOiB7Im5hbWUiOiB7IlMiOiAiQWJoaXNlayBSb3kifSwgImNpdHkiOiB7IlMiOiAiQmVuZ2FsdXJ1In19LCAiU2l6ZUJ5dGVzIjogMTIzfSwgImV2ZW50U291cmNlIjogImF3czpkeW5hbW9kYiJ9',
'kinesisRecordMetadata': {
'sequenceNumber': '41230',
'subsequenceNumber': 0,
'partitionKey': 'D3A9',
'shardId': 'shardId-000000000001',
'approximateArrivalTimestamp': 1649719712500
}
}
]
}

Now, you will mainly be dealing with the value of “records” here, since this is where the changes in the DB are present. Also remember, in case multiple rows are added/edited at the same time, this records array will be having >1 entries. The lambda shown here can process 1 or more entries- basically whatever is thrown at it.

So, we start with looping over the records and formatting the data. The first data_point that we extract is the “time_stamp”, which we populate using the value of “approximateArrivalTimestamp”. Next, we pick up the data field. This might look gibberish but it is just an encoded JSON. We use the decode_message function to decode it (remember to use “utf-8”. I used “ascii” in the beginning but that doesn’t support all the characters that you are likely to encounter in a JSON). The decoded message for the above example will look like this-

{
"awsRegion": "ap-south-1",
"eventID": "7d5asd1-dasdasd5e-4dasdasdd4-a1asd2f-9asd19802",
"eventName": "MODIFY",
"userIdentity": null,
"recordFormat": "application/json",
"tableName": "table_name",
"dynamodb": {
"ApproximateCreationDateTime": 1649719712473,
"Keys": {
"name": {
"S": "98dddfe3-1ac8-4db3-bf2e-39f992a179ac"
}
},
"NewImage": {
"name": {
"S": "Abhisek Roy"
},
"city": {
"S": "Kolkata"
}
},
"OldImage": {
"name": {
"S": "Abhisek Roy"
},
"city": {
"S": "Bengaluru"
}
},
"SizeBytes": 123
},
"eventSource": "aws:dynamodb"
}

These fields are self-explanatory and you can see that from OldImage to NewImage, how the city for me has changed. Since we are only capturing the change itself, we will only be saving the NewImage, since the OldImage would already have been saved in its Insert or Modify operation (when it was inserted or when it was a result of modifying an earlier insert).

Once we decode the message, we create a uuid which is just the sha256 hash of the message shown above. There is a reason behind this step which I will come to later.

Next, we look at the event type, and since we will be dealing only with inserts and updates, the possible eventNames that we can get are “INSERT” or “MODIFY”. Accordingly, we shall set the value of the key “op” to “I” or “U”. Why do we do this? Well, this is exactly how DMS (Database Migration System), a managed service of AWS handles CDC before storing it in the Data Lake. Since we are already streaming other databases like Postgres using DMS, we wanted to keep the format of the data, and the columns, the same.

Next, we fetch the NewImage, convert the dynamo-json to a normal json using the dynamodb_json library, and save it under the data key. So you have 4 columns- uuid, time_stamp, op, and data. Now that this is done, we convert the data field to a string using json.dumps, and then convert the entire row (new JSON) that we created into a string again using json.dumps. Then we encode this message using the encode_message function- take it as base64_message. Now before you return this information to save it in the Data Lake, you need to create a JSON, with 3 fields-

  1. data- where you put the base64_message that you created earlier.
  2. recordId- where you put the recordId of the particular record that you were processing.
  3. result- where you put the status of the formatting.

All these parameters are important and you can read more about them here.

import base64
import hashlib
import json
import logging
import uuid
from datetime import datetime
from dynamodb_json import json_util as util

logger = logging.getLogger()
logger.setLevel(logging.DEBUG)


def decode_message(base64_message):
base64_bytes = base64_message.encode('utf-8')
message_bytes = base64.b64decode(base64_bytes)
message = message_bytes.decode('utf-8')
return message


def encode_message(message):
message_bytes = message.encode('utf-8')
base64_bytes = base64.b64encode(message_bytes)
base64_message = base64_bytes.decode('utf-8')
return base64_message


def lambda_handler(event, context):
logger.info("Captured event:- %s", event)
new_records = []
for record in event["records"]:
new_data = {"time_stamp": record.get("approximateArrivalTimestamp")}
base64_message = record.get("data")
message = decode_message(base64_message)

# since the message is in the form of a stringified JSON, you have to change it to a dictionary.
message = json.loads(message)

new_data["uuid"] = str(hashlib.sha256(str(message.get("dynamodb").get("NewImage")).encode('utf-8')).hexdigest())

if message.get("eventName") == "INSERT":
new_data["op"] = "I"
elif message.get("eventName") == "MODIFY":
new_data["op"] = "U"
dynamo_entry = util.loads(message.get("dynamodb").get("NewImage"))

if dynamo_entry is None:
# If data is none (happens when a delete operation occurs), then no need to add entry.
continue

# Convert the dynamo entry to a stringified JSON
new_data["data"] = json.dumps(dynamo_entry)

# Convert the row you intend to add in the Data Lake into a Stringified JSON
new_message = json.dumps(new_data)
logger.info("Updated record %s to record:- %s", message, new_message)

# Encode message back to base64 before sending to the data lake
base64_message = encode_message(new_message)

# Record must have all these 3 fields and their values too must follow the norms laid out.
new_records.append({
"result": "Ok",
"data": base64_message,
"recordId": record.get("recordId")
})
logger.info("Sending records:- %s", new_records)

return {"records": new_records}

You might have noticed that we skipped a line where we checked if the dynamo_entry is None. Actually, this situation occurs when there is a DELETE operation. However, we are not expecting any DELETE operations in our table due to which we are skipping those in case they are present.

Let’s come back to the uuid that we created using a sha256 hash of the message. We did this because when using a Kinesis Stream, there are chances of duplicate messages in the stream due to failures in communication between either the consumer or the provider. To handle such scenarios, having a uuid would help since you can get the unique uuids for any primary_id to get only the unique CDC rows for it. This failsafe was created to handle an extreme use case and may not be required in case you are handling the data separately after fetching it from the Data Lake- since you may be able to filter out the duplicates there as well.

Conclusion

This brings us to the end of part I of this series, where we discussed how exactly to stream CDC from a DynamoDB table to a Data Lake. In Part 2, we will go about deep diving into factors like querying the data, updating partitions, getting older data into the Data Lake, and more.

Over and out.

--

--