Streaming data from DynamoDB to your Data Lake (Part- II)

Abhisek Roy
Credit Saison (India)
12 min readApr 21, 2022

While in Part I, we discussed the resources and the whys and the hows of streaming CDC from DynamoDB to the Data Lake, in this part, we shall look at the accessories and major pointers that we need to keep in mind while doing so. Some of these may be optional for you or already known, but have still been jotted down so that parts I and II together create a complete and beautiful picture of the final goal.

Athena, Glue, and S3 buckets

Glue Tables, S3 buckets, and Athena

While we explained how we will be setting up our infra such that data flows from the DynamoDB table to the Data Lake (our S3 bucket), querying the data is just as important and those unfamiliar with Athena may not be sure how it works. Let me show how Glue and data in the S3 bucket are used together to get you the results of queries.

When you go into specific folders based on what prefix you mentioned in the Kinesis Firehose, of the S3 bucket that you are using as your Data Lake, you will see a view such as this. Multiple parquet files are there in the folder structure and partition folders as you specified.

The S3 bucket and location where you have the data flowing in from the DynamoDB Table

Next, we come to the Glue Catalogue Table. You can see the columns and the data types. These are exactly what you defined in the Cloudformation template for the Glue Table. You can see that the partitions are also showing up as columns. Even when you query the data via Athena, you will be getting the partition value in the form of columns.

The Glue Table that you created

We just saw that both our S3 bucket and our Glue catalogue are set up and ready. Now you can go ahead and query your data. Select the correct data source and Glue Database (that you just created) on the left-hand side of the console, type your query and run it. Once you run it you will be shown the results below in a table format just the way you open a file in Microsoft Excel. The data can then be downloaded in a CSV file if you want.

The Athena interface

Athena Queries are pure SQL queries with a few quirks and quips that you will pick up along the way once you start using it on a day-to-day basis and write more complex queries.

Updating partitions in Glue table.

Everything is ready, and you have data getting streamed from your DynamoDB table in real-time. However, whenever you query your data through Athena, you are unable to find data for the latest date. What you realise is that the partitions for the Glue table have not been updated. You manually run

MSCK REPAIR TABLE table_name;

and everything is good until the next time.

However, suppose you want a permanent solution. You want the partitions to be there when you open Athena because let’s be honest, you are just as lazy as me, which is why you want to automate the hell out of every single thing.

So, there are multiple ways that you can go about this, but I chose this as the most lightweight solution where there are no failure scenarios. I set up a lambda which will be triggered everyday at 10.30 AM in the morning (IST). When it is triggered, it will create the partition for the next day.

import boto3
import logging
import os
from datetime import date, timedelta
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# Set log levels for default libraries
logging.getLogger('boto3').setLevel(logging.ERROR)
logging.getLogger('botocore').setLevel(logging.ERROR)
logging.getLogger('s3transfer').setLevel(logging.ERROR)
logging.getLogger('urllib3').setLevel(logging.ERROR)
GLUE_DATABASE = os.getenv('GLUE_DATA_LAKE_DATABASE')
ATHENA_RESULTS_BUCKET = 's3://' + os.getenv('ATHENA_RESULTS_BUCKET')+"/"
TABLE_NAME = "table_name"client = boto3.client('athena')def lambda_handler(event, context):
tomorrow = date.today() + timedelta(days=1)
tomorrow_date, tomorrow_month, tomorrow_year = \
tomorrow.strftime("%d"), tomorrow.strftime("%m"), tomorrow.strftime("%Y")
logger.info("Captured Date %s", tomorrow_date) query = f"ALTER TABLE {TABLE_NAME} ADD IF NOT EXISTS PARTITION (year=\'{tomorrow_year}\', month=\'{tomorrow_month}\', day = \'{tomorrow_date}\')"
logger.info("Running query- %s", query)
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={
'Database': GLUE_DATABASE
},
ResultConfiguration={
'OutputLocation': ATHENA_RESULTS_BUCKET
},
)
logger.info(response)
return True

While the idea initially was to create a partition for the same day, at exactly midnight, we chose this approach because we did not want to take any chances, of say data entering the data lake at exactly 12, someone running an Athena query at the same time, and the partitioning lambda executing just a millisecond later. Hence, we are creating partitions in the table, a day in advance.

Getting your old data in case the DynamoDB table was created beforehand

This is an issue many of you might face. We are using quite a few DynamoDB tables in our cloud but we hadn’t streamed any of them into the Data Lake earlier. It is only when one of our Product Managers came to inform us that this data was required in the Data Lake so that he can perform a join with another table and look at multiple data points at the same time, that we started thinking about how we can create a solution.

Again there may be multiple possibilities here. The most recommended one might be for you to create a backup copy of the existing data by going to the Backups Option for your table.

When PITR is disabled for a Dynamo Table

But remember that unless you have PITR (Point in time recovery) enabled, you will be shown the error message given above and won’t be allowed to create one. It is only when you enable it that you will be given this option-

Creating backups for a Dynamo Table

You can also create multiple backups at different points in time, and use the latest one, right after you deploy the DynamoDB->Data Lake infra. This way you would have a copy of the existing data, while you stream the fresh data.

You can create multiple backups at different points in time

The existing data will however be in a JSON format and you will need to process the data by writing a script of your own and save it to a parquet file in a partition in the folder that you are using for the current table. Remember when we set the value of prefix in the Firehose?

Prefix: dynamo-cdc/database/table_name/year=!{timestamp:YYYY}/month=!{timestamp:MM}/day=!{timestamp:dd}/

You will need to create a folder in this format (you can create a folder for the T-1 date where T is the date when you start streaming data into Data Lake). We chose T date- i.e. we put the earlier data also in the same partition as the date when we started streaming since there was data of the latest day as well.

However, we did not go via this backup mode. What we did is-

  1. Created a DynamoDb with the same configuration in our Development environment and set up the same DynamoDB -> Data Lake infra.
  2. Downloaded prod data in CSV format (there is an option in every DynamoDB Table).
  3. Wrote a script to process this data and add it to the table we created in the Development environment (please remember that you will need to do some preprocessing. Form data will need to be converted from DynamoDB JSON to normal JSON. Negative numbers will have a “ ` ” added before them. Example -10 will be there as `-10 in the CSV. All these need to be corrected before you load the data to the DynamoDB table)
  4. We made a minor change in the Lambda- we changed the time_stamp column to pick up the value of the updated_at column that we had in our row entries (we always keep a created_at and an updated_at column for any table that we create as a best practice).
  5. The data got pumped to the Data Lake in dev in Parquet format. We copied it and put it in the partition for Day T in production (the day our DynamoDB -> Data Lake went live on production).

…and that was it, the old data was now in our Data Lake and queryable via Athena. One thing to note here is that for the data that was already in our DynamoDB table, all the entries will be “INSERT”, even if any of the rows have been edited n number of times since we do not have the CDC from the time before we set up the DynamoDB->Data Lake stream.

Why not DMS…and What’s the difference?

This is the first question and not the last one in my list of questions when I went about working on this particular problem statement. Why not use Database Migration System that is provided as a managed service by AWS to do this job. Well, the simple answer is that there is no support for DynamoDB. The fact that it is a NoSQL database might be the reason since no other NoSQL database is possibly supported as a source in DMS.

This brings us to another important fact. As you might have seen, we need to define the schema of the DynamoDB table in the Glue Table that we create and whose reference we pass into Kinesis. So basically your schema must be defined before the data has started pumping in. This can cause multiple issues-

  1. Say you have passed a row with 5 new columns (which you can easily do since this is a NoSQL database). The data pumped into the Data Lake will not have those 5 columns since those weren’t there in the Glue table.
  2. In case some changes are made to the existing columns (like say you changed the data type of a column)…data flow to Data Lake would stop and you will find errors being added to the error folder that you defined in the Kinesis Firehose.
  3. This is very different from how we create a DataBase->Data Lake stream using DMS, where we first let the data flow in, then run Glue Crawlers, and create the corresponding data catalogues via Glue Tables. The reason behind this is that Kinesis Firehose needs to know the schema so that it can do a JSON->Parquet conversion.

So what we realised is that if we create a Glue table corresponding to the exact schema that we have in our glue table, we will need to make frequent updates to the schema as well as write data correction scripts when we miss the schema updates and need to update incomplete data that crept into our data lake.

This is why we have created a single column called data where we are storing the entire row in JSON form.

  1. This makes sure that no schema changes are required and no errors/incomplete data creep into the Data Lake.
  2. You will just need to query the data via Athena using the “json_extract_scalar” so that you can access columns that are inside the data column.

Let us elaborate a little on point 2 since there might be some who are unaware of this (even I was, until a week back). Say we have a table in Dynamo where we want to get all the names and ages. However, we have saved it in a single data column and are not sure how we will be writing the query. This is how our query will look-

SELECT json_extract_scalar(data, '$.name') AS user_name,
json_extract_scalar(data, '$.age') AS user_age
FROM user_table;

You can convert any SQL query in this way so that you can reach into the JSON data and query it via any of its sub-attributes (whether the attributes are at the top level or nested inside doesn’t matter).

The way I see it, formatting the data and keeping it as a single column has only advantages as compared to converting every column to a unique column in your Glue Table. The only difference is the slightly more complicated SQL queries which you can surely get used to.

Special mentions- IAM Roles, Crons, Partitioning, and Timestamps

IAM Roles

While I did share the Cloudformation templates, I did not share the IAM roles that have been used. However these IAM roles are vital and unless every resource has the precise permission, you will not be able to make things work. I will be sharing specific permissions that I had issues with so that you can get things done faster in case you are stuck too.

Policies:
- AWSLambdaBasicExecutionRole
- Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- athena:StartQueryExecution
Resource: "*"
- Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- s3:PutObject
- s3:GetObject
- s3:ListBucket
- s3:GetBucketLocation
- s3:PutObjectAcl
Resource:
- !Sub 'arn:aws:s3:::${AthenaResultsBucket}'
- !Sub 'arn:aws:s3:::${AthenaResultsBucket}/*'
- !Sub 'arn:aws:s3:::${DataLakeBucket}'
- !Sub 'arn:aws:s3:::${DataLakeBucket}/*'
- Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- glue:GetTable
- glue:BatchCreatePartition
Resource:
- !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/${GlueDataBase}/${GlueTable}"
- !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${GlueDataBase}"
- !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog"
- Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- glue:GetDatabase
Resource:
- !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/${GlueDataBase}"
- !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog"

The one above is the permission that we used for the Lambda that we use to create new partitions. I did not find any examples for this online and I needed to give precise permission (as recommended since it is the best practice).

Next, I have shared the role that I used as a Delivery Role in Firehose-

DeliveryRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Sid: ''
Effect: Allow
Principal:
Service: firehose.amazonaws.com
Action: 'sts:AssumeRole'
Condition:
StringEquals:
'sts:ExternalId': !Ref 'AWS::AccountId'
Path: "/"
Policies:
- PolicyName: firehose_delivery_policy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 's3:AbortMultipartUpload'
- 's3:GetBucketLocation'
- 's3:GetObject'
- 's3:ListBucket'
- 's3:ListBucketMultipartUploads'
- 's3:PutObject'
Resource:
- !Sub 'arn:aws:s3:::${DataLakeBucket}'
- !Sub 'arn:aws:s3:::${DataLakeBucket}/*'
- Effect: Allow
Action: 'glue:GetTableVersions'
Resource: '*'
- Effect: Allow
Action: 'logs:PutLogEvents'
Resource: '*'

And here we have the Delivery Stream Role for Firehose. Make sure you don’t forget the lambda:InvokeFunction and lambda:GetFunctionConfiguration permissions in case you use a lambda function for your data processing. In case you don’t, you can remove these.

FirehoseDeliveryStreamRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Statement:
- Action: sts:AssumeRole
Effect: Allow
Principal:
Service: firehose.amazonaws.com
Version: '2012-10-17'
Description: Role for Firehose stream to receive data
Policies:
- PolicyDocument:
Statement:
- Action:
- logs:*
Effect: Allow
Resource: "*"
Version: '2012-10-17'
PolicyName: AllowLogging
- PolicyDocument:
Statement:
- Action:
- kms:Encrypt
- kms:Decrypt
- kms:ReEncrypt
- kms:GenerateDataKey*
- kms:DescribeKey
Effect: Allow
Resource: !ImportValue s3-encryption-key
Version: '2012-10-17'
PolicyName: kms
- PolicyDocument:
Statement:
- Action:
- s3:ListBucket
- s3:ListBucketByTags
- s3:GetBucketLocation
Effect: Allow
Resource:
- !Sub 'arn:aws:s3:::${DataLakeBucket}'
Version: '2012-10-17'
PolicyName: s3Permissions1
- PolicyDocument:
Statement:
- Action:
- s3:PutObject
- s3:ListObjects
- s3:PutObjectAcl
Effect: Allow
Resource:
- !Sub 'arn:aws:s3:::${DataLakeBucket}'
- !Sub 'arn:aws:s3:::${DataLakeBucket}/*'
Version: '2012-10-17'
PolicyName: s3Permissions2
- PolicyDocument:
Statement:
- Action:
- kinesis:DescribeStream
- kinesis:GetShardIterator
- kinesis:GetRecords
- kinesis:ListShards
Effect: Allow
Resource: !Sub 'arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/${KinesisStream}'
Version: '2012-10-17'
PolicyName: KinesisPermissions
- PolicyDocument:
Statement:
- Action:
- lambda:InvokeFunction
- lambda:GetFunctionConfiguration
Effect: Allow
Resource: !Ref ProcessDynamoEntriesLambda
Version: '2012-10-17'
PolicyName: lambdaPermissions

Cron Jobs

We used a cron to trigger the lambda that updates partitions daily (It runs at 10:30 hours IST. For that, we used this particular event-

Events:
UpdateTablePartitions:
Type: Schedule
Properties:
Description: Update Table Partitions for DynamoDb table in Data Lake
Schedule: cron(0 5 * * ? *)
Input: ""
Enabled: true

Recommend you read up more on this here.

Partitioning

While you might ask why we have used partitioning, or whether it is a necessity or a good to have, I would highly recommend you use it and it is a must-have in the long term. It improves query efficiencies- queries run faster and don’t need to parse the entire data in the table (they can only go through the partitions mentioned in the query). However, to take full advantage of partitions, you must use them in your queries in the first place.

Timestamps

In case you remember the table schema that we defined in Glue, you might remember seeing a timestamp column. This column needs to have the data in only some specific formats. Instead of using the complicated date-time formats, I went for timestamps in milliseconds also called milliseconds since Unix Epoch. You can check the conversion from this value to your human-readable timestamp here.

Given that I always need to deep dive into whatever I build, I actually downloaded the parquet files from the S3 bucket and read them using the command-

parquet-tools head file_name.parquet

What I got as a result was-

uuid = bba817c04a8413274097737742e181050abcfa12c49d7288a2bcc335988dcb8b
op = I
time_stamp = AK7X7EwTAAAeiCUA
data = {"name": "Abhsiek Roy", "city":"Kolkata"}

As you see, the time_stamp seems to have been converted to some random string. I tried debugging this for 2 days while my boss was onto me as to why this was taking so long (just kidding). Finally, I let the data be as it was in the S3, set up Athena, and queried the data, and lo and behold, the timestamp column showed up perfectly.

The realisation here is that if you give the timestamp in an unsupported format, you will get an error in the error prefix folder in the S3 bucket. If the format is correct, it gets encoded while saving to Parquet and is decoded again when queries are run.

The only reason I explained this is so that others don’t waste the 2 days that I did in search of the bug that wasn’t.

Conclusion

While this set of 2 articles has grown to the size of a full-blown book chapter, the reason I went into the nitty-gritty is that I would love the next developer who works on a similar problem to have these findings with him or her, work on top of these and bring out new findings and information to the forefront for the developer community.

Over and out.

--

--