Data Transfer Dynamodb to Redshift

ananthsrinivas
5 min readMar 18, 2019

--

Table of Contents

Introduction

Today I would like to talk about a simple data transfer use case where we want to move data from real time db (Dynamo db) to a data warehouse (Redshift). We generally perform extraction of real time data, transform the data as required and load the data into data warehouse for various reasons like business analytics, transposing into views to cater different stake holders (sales, marketing, etc.,) and we call this operation as ETL.

This is similar to my last blog where Lambda is triggered based on SNS notification and the data is pushed to Redshift using Firehose. Here we are going to see how to enable Dynamo db streams and use that to push the real time data to Redshift using Firehose.

System Architecture

Data Transfer DDB Streams to Redshift

Architecture Break Down

  1. Enable Dynamo db streams for a given DDB table.
  2. Setup Dynamo db streams as trigger to invoke the lambda function.
  3. Lamdba function processes the events, parses the dynamo db record and through firehose performs the data transfer into AWS Redshift.

Pre-requisites

The Firehose configurations required can be found in my previous blog here

  1. Firehose — we need a Firehose setup that would deliver the incoming DDB stream data to Redshift. Firehose internally stores the incoming data in S3, performs buffering and delivers the data to the Redshift
  2. Redshift — we need a Redshift cluster setup and create the required table in Redshift where the DDB stream data has to be written to.

Let’s take a look at the DDB streams and the lambda function below,

DDB Streams

You can enable Dynamo db streams from the DDB console as follows,

  1. In the DDB console you can find all your DDB tables.
  2. Selec the table for which you want to enable streams.
  3. On the right pane go to Overview tab
  4. Click on Manage Stream

Above are the 4 different ways the information will be written to the stream whenever data in the table is modified. Below link has more details about the streams and the options,

Lambda function

You can create a simple Lambda function in the console as follows. I have already discussed about Lambda in my previous blog here.

  1. In the Lambda console, create a function, specify the language runtime and add the required permissions.
  2. Select the DDB table as a trigger to the lambda function and save the function.
  3. You can choose to write the code in the console or upload the ZIP file
Creating Lamdba Function
Selecting DDB Trigger
Lambda code

Lambda code

I have written a small code in the console itself to log the event processed by the lamdba function.

import json
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
for record in event['Records']:
logger.info(record) ddbRecord = record['dynamodb']
logger.info('DDB Record: ' + json.dumps(ddbRecord))

return 'processed {} records.'.format(len(event['Records']))

Below are the logs printed based on the above code that I fetched from the cloudwatch logs,

[INFO] 2019–03–18T03:44:37.82Z aa89962a-5145–4868–97a9-f04efbfd2c07 {‘eventID’: ‘8ba0242407cd4361080e6168e9b18701’, ‘eventName’: ‘INSERT’, ‘eventVersion’: ‘1.1’, ‘eventSource’: ‘aws:dynamodb’, ‘awsRegion’: ‘us-east-1’, ‘dynamodb’: {‘ApproximateCreationDateTime’: 1552880676.0, ‘Keys’: {‘productCode’: {‘S’: ‘AE9QFENBDN’}}, ‘NewImage’: {‘productCode’: {‘S’: ‘AE9QFENBDN’}, ‘MfgDate’: {’N’: ‘1531591730’}, ‘upc’: {‘S’: ‘858081006202’}, ‘BestByDate’: {’N’: ‘1544810930’}}, ‘SequenceNumber’: ‘1089333300000000003999920026’, ‘SizeBytes’: 86, ‘StreamViewType’: ‘NEW_AND_OLD_IMAGES’}, ‘eventSourceARN’: ‘arn:aws:dynamodb:us-east-1:865876670085:table/ProductItem/stream/2019–03–18T03:19:03.890’}

you can do record[eventName], record[dynamodb], record[eventSource], etc., to fetch the required information from the event. Now that we are interested in the dynamodb record, I have also logged record[dynamodb] in the above code and dumped it in json format. Output of the log below,

[INFO] 2019-03-18T03:44:37.82Z aa89962a-5145-4868-97a9-f04efbfd2c07 DDB Record:
{
"ApproximateCreationDateTime": 1552880676,
"Keys": {
"productCode": {
"S": "AE9QFENBDN"
}
},
"NewImage": {
"productCode": {
"S": "AE9QFENBDN"
},
"MfgDate": {
"N": "1531591730"
},
"upc": {
"S": "858081006202"
},
"BestByDate": {
"N": "1544810930"
}
},
"SequenceNumber": "1089333300000000003999920026",
"SizeBytes": 86,
"StreamViewType": "NEW_AND_OLD_IMAGES"
}

Converting to Firehose record

Next step is to transform the Dynamo db stream image into Firehose record. Code below,

def convertToFirehoseRecord(ddbRecord):
firehoseRecord = ''
# Parse the NewImage json element
newImage = ddbRecord['NewImage']
# construct firehose record from NewImage
firehoseRecord = '{},{},{},{}'.format(newImage['productCode']['S'],
newImage['MfgDate']['N'],
newImage['BestByDate']['N'],
newImage['upc']['S']) + '\n'
return firehoseRecord

Writing to Firehose

Next step is to create the Firehose client and then write the transformed record into Firehose delivery stream. Please find the details about creating Firehose and the configurations required in my previous blog here.

Full functional code below. Please handle corner cases as required, for example, NewImage will not be available for all event types. NewImage will be present only for INSERT/MODIFY “eventName”

import json
import logging
import boto3
logger = logging.getLogger()
logger.setLevel(logging.INFO)
firehose = boto3.client('firehose')
deliveryStreamName = '<firehose-stream-name>'
def lambda_handler(event, context):
for record in event['Records']:
logger.info(record) ddbRecord = record['dynamodb']
logger.info('DDB Record: ' + json.dumps(ddbRecord))

firehoseRecord = convertToFirehoseRecord(ddbRecord)
logger.info('Firehose Record: ' + firehoseRecord)

firehose.put_record(DeliveryStreamName=deliveryStreamName, Record={ 'Data': firehoseRecord})
return 'processed {} records.'.format(len(event['Records']))def convertToFirehoseRecord(ddbRecord):
firehoseRecord = ''
# Parse the NewImage json element
newImage = ddbRecord['NewImage']
# construct firehose record from NewImage
firehoseRecord = '{},{},{},{}'.format(newImage['productCode']['S'],
newImage['MfgDate']['N'],
newImage['BestByDate']['N'],
newImage['upc']['S']) + '\n'
return firehoseRecord

Summary

We are able to create all the required components to perform the ETL operation pretty easily through AWS console itself. In this use case I have used boto3 which is the AWS SDK for python to create Firehose client and use the put_record function. You can find good documentation below,

You might run into permission issues after adding put_record call into your lambda function as below,

[ERROR] ClientError: An error occurred (AccessDeniedException) when calling the PutRecord operation: User: arn:aws:sts::865876670085:assumed-role/lambda_basic_execution/DDB_Stream_Lambda is not authorized to perform: firehose:PutRecord on resource: arn:aws:firehose:us-east-1:865876670085:deliverystream/DDB_Redshift

All you have to do is to Attach the Firehose Access policy to the lambda role (defined when we created the lambda function).

Feel free to share your use cases and stint with AWS components. Also please share/critique your comments about the blog.

--

--