Data Transfer Dynamodb to Redshift
Table of Contents
- Introduction
- System Architecture
- Architecture Break Down
- DDB Streams
- Lambda function
- Writing to Firehose
- Summary
Introduction
Today I would like to talk about a simple data transfer use case where we want to move data from real time db (Dynamo db) to a data warehouse (Redshift). We generally perform extraction of real time data, transform the data as required and load the data into data warehouse for various reasons like business analytics, transposing into views to cater different stake holders (sales, marketing, etc.,) and we call this operation as ETL.
This is similar to my last blog where Lambda is triggered based on SNS notification and the data is pushed to Redshift using Firehose. Here we are going to see how to enable Dynamo db streams and use that to push the real time data to Redshift using Firehose.
System Architecture
Architecture Break Down
- Enable Dynamo db streams for a given DDB table.
- Setup Dynamo db streams as trigger to invoke the lambda function.
- Lamdba function processes the events, parses the dynamo db record and through firehose performs the data transfer into AWS Redshift.
Pre-requisites
The Firehose configurations required can be found in my previous blog here
- Firehose — we need a Firehose setup that would deliver the incoming DDB stream data to Redshift. Firehose internally stores the incoming data in S3, performs buffering and delivers the data to the Redshift
- Redshift — we need a Redshift cluster setup and create the required table in Redshift where the DDB stream data has to be written to.
Let’s take a look at the DDB streams and the lambda function below,
DDB Streams
You can enable Dynamo db streams from the DDB console as follows,
- In the DDB console you can find all your DDB tables.
- Selec the table for which you want to enable streams.
- On the right pane go to Overview tab
- Click on Manage Stream
Above are the 4 different ways the information will be written to the stream whenever data in the table is modified. Below link has more details about the streams and the options,
Lambda function
You can create a simple Lambda function in the console as follows. I have already discussed about Lambda in my previous blog here.
- In the Lambda console, create a function, specify the language runtime and add the required permissions.
- Select the DDB table as a trigger to the lambda function and save the function.
- You can choose to write the code in the console or upload the ZIP file
Lambda code
I have written a small code in the console itself to log the event processed by the lamdba function.
import json
import logginglogger = logging.getLogger()
logger.setLevel(logging.INFO)def lambda_handler(event, context):
for record in event['Records']: logger.info(record) ddbRecord = record['dynamodb']
logger.info('DDB Record: ' + json.dumps(ddbRecord))
return 'processed {} records.'.format(len(event['Records']))
Below are the logs printed based on the above code that I fetched from the cloudwatch logs,
[INFO] 2019–03–18T03:44:37.82Z aa89962a-5145–4868–97a9-f04efbfd2c07 {‘eventID’: ‘8ba0242407cd4361080e6168e9b18701’, ‘eventName’: ‘INSERT’, ‘eventVersion’: ‘1.1’, ‘eventSource’: ‘aws:dynamodb’, ‘awsRegion’: ‘us-east-1’, ‘dynamodb’: {‘ApproximateCreationDateTime’: 1552880676.0, ‘Keys’: {‘productCode’: {‘S’: ‘AE9QFENBDN’}}, ‘NewImage’: {‘productCode’: {‘S’: ‘AE9QFENBDN’}, ‘MfgDate’: {’N’: ‘1531591730’}, ‘upc’: {‘S’: ‘858081006202’}, ‘BestByDate’: {’N’: ‘1544810930’}}, ‘SequenceNumber’: ‘1089333300000000003999920026’, ‘SizeBytes’: 86, ‘StreamViewType’: ‘NEW_AND_OLD_IMAGES’}, ‘eventSourceARN’: ‘arn:aws:dynamodb:us-east-1:865876670085:table/ProductItem/stream/2019–03–18T03:19:03.890’}
you can do record[eventName], record[dynamodb], record[eventSource], etc., to fetch the required information from the event. Now that we are interested in the dynamodb record, I have also logged record[dynamodb] in the above code and dumped it in json format. Output of the log below,
[INFO] 2019-03-18T03:44:37.82Z aa89962a-5145-4868-97a9-f04efbfd2c07 DDB Record:
{
"ApproximateCreationDateTime": 1552880676,
"Keys": {
"productCode": {
"S": "AE9QFENBDN"
}
},
"NewImage": {
"productCode": {
"S": "AE9QFENBDN"
},
"MfgDate": {
"N": "1531591730"
},
"upc": {
"S": "858081006202"
},
"BestByDate": {
"N": "1544810930"
}
},
"SequenceNumber": "1089333300000000003999920026",
"SizeBytes": 86,
"StreamViewType": "NEW_AND_OLD_IMAGES"
}
Converting to Firehose record
Next step is to transform the Dynamo db stream image into Firehose record. Code below,
def convertToFirehoseRecord(ddbRecord):
firehoseRecord = '' # Parse the NewImage json element
newImage = ddbRecord['NewImage'] # construct firehose record from NewImage
firehoseRecord = '{},{},{},{}'.format(newImage['productCode']['S'],
newImage['MfgDate']['N'],
newImage['BestByDate']['N'],
newImage['upc']['S']) + '\n' return firehoseRecord
Writing to Firehose
Next step is to create the Firehose client and then write the transformed record into Firehose delivery stream. Please find the details about creating Firehose and the configurations required in my previous blog here.
Full functional code below. Please handle corner cases as required, for example, NewImage will not be available for all event types. NewImage will be present only for INSERT/MODIFY “eventName”
import json
import logging
import boto3logger = logging.getLogger()
logger.setLevel(logging.INFO)firehose = boto3.client('firehose')
deliveryStreamName = '<firehose-stream-name>'def lambda_handler(event, context):
for record in event['Records']: logger.info(record) ddbRecord = record['dynamodb']
logger.info('DDB Record: ' + json.dumps(ddbRecord))
firehoseRecord = convertToFirehoseRecord(ddbRecord)
logger.info('Firehose Record: ' + firehoseRecord)
firehose.put_record(DeliveryStreamName=deliveryStreamName, Record={ 'Data': firehoseRecord}) return 'processed {} records.'.format(len(event['Records']))def convertToFirehoseRecord(ddbRecord):
firehoseRecord = '' # Parse the NewImage json element
newImage = ddbRecord['NewImage'] # construct firehose record from NewImage
firehoseRecord = '{},{},{},{}'.format(newImage['productCode']['S'],
newImage['MfgDate']['N'],
newImage['BestByDate']['N'],
newImage['upc']['S']) + '\n' return firehoseRecord
Summary
We are able to create all the required components to perform the ETL operation pretty easily through AWS console itself. In this use case I have used boto3 which is the AWS SDK for python to create Firehose client and use the put_record function. You can find good documentation below,
You might run into permission issues after adding put_record call into your lambda function as below,
[ERROR] ClientError: An error occurred (AccessDeniedException) when calling the PutRecord operation: User: arn:aws:sts::865876670085:assumed-role/lambda_basic_execution/DDB_Stream_Lambda is not authorized to perform: firehose:PutRecord on resource: arn:aws:firehose:us-east-1:865876670085:deliverystream/DDB_Redshift
All you have to do is to Attach the Firehose Access policy to the lambda role (defined when we created the lambda function).
Feel free to share your use cases and stint with AWS components. Also please share/critique your comments about the blog.