Real-time processing using by AWS Kineses Data Streams, Firehose and Lambda to store S3 and DynamoDB

Burak Dogu
3 min readFeb 11, 2024

--

In the world of big data, effieciently managing and processing data streams is paramount. AWS offers a plethora of services that can be woven together to create a robust data processing pipeline. In this article, I will take you through a project where we leveraged AWS Kinesis Data Streams, AWS Lambda, DynamoDB, Kinesis Firehose and S3 to build a seamless data processing architecture.

As a first step, data that read from json file were ingested into Kinesi Data Streams.

import json
import time
import boto3
from datetime import datetime

STREAM_NAME = "input_data"
KINESIS_CLIENT = boto3.client('kinesis')


def get_views_data(interval_seconds: int = 5, stream_name=STREAM_NAME, kinesis_client=KINESIS_CLIENT):
current_dt_value = datetime.now()
dt_value = current_dt_value.strftime("%Y-%m-%d %H:%M:%S")

with open("./resources/product-views.json") as lines:
try:
for line in lines:
views_data = json.dumps(line)
print(views_data)
kinesis_client.put_record(
StreamName=stream_name,
Data=views_data,
PartitionKey=f"{hash(dt_value)}"
)
time.sleep(interval_seconds)

except Exception as e:
print(f"Error publishing message: {e}")


get_views_data()

Raw Data Processing

An AWS Lambda function triggered by Kinesis Data Streams was created. Additionally, a table was created in DynamoDB.

Figure-1: Lambda function will be triggered by Kinesis Data Streams

Lambda Function — 1:

import json
import boto3
import base64
dynamodb = boto3.resource('dynamodb')
table_name = dynamodb.Table('raw_views_data')


def lambda_handler(event, context):
for record in event['Records']:
pk_time = record["kinesis"]["partitionKey"]
msg_decode = base64.b64decode(record["kinesis"]["data"]).decode('utf-8')
dict_msg = json.loads(msg_decode)
print(dict_msg)

table_name.put_item(Item={
"pk_time": pk_time,
"data": dict_msg
})

return {
'statusCode': 200
}

Output:

Figure-2: A screenshot from DynamoDB table

Processed Data

Raw data is concurrently sent to Firehose for processing, transformed by using a Lambda function, and finally written to the S3 environment.

There are a few important points at this stage. One is the timeout duration of the Lambda function during transformation. Increasing this duration can positively affect the process. Another is configuring the buffer size and buffer interval in Kinesis Firehose.

Lambda Function — 2:

import base64
import json
import ast

output = []


def lambda_handler(event, context):
for record in event['records']:
payload = base64.b64decode(record['data']).decode('utf-8')
print('payload:', payload)

payload_str = json.loads(payload)

payload_dict = ast.literal_eval(payload_str)
print('payload_dict', type(payload_dict)) # dict to parsing

msg = {
"event": payload_dict.get('event', ''),
"messageid": payload_dict.get('messageid', ''),
"userid": payload_dict.get('userid', ''),
"productid": payload_dict.get('properties', {}).get('productid', ''),
"source": payload_dict.get("context", {}).get("source", '')
}

payload_str = json.dumps(msg) # str to add new_line
print('payload_str:', payload_str)
row_w_newline = payload_str + "\n"
row_w_newline_encoded = base64.b64encode(row_w_newline.encode('utf-8'))

output_record = {
'recordId': record['recordId'],
'result': 'Ok',
'data': row_w_newline_encoded
}
output.append(output_record)

print('Processed {} records.'.format(len(event['records'])))

return {'records': output}

Output:

Figure-3: Output views data in S3 bucket

You can make various changes in the pipeline; you can write to S3 in CSV format, can be query using Athena,or directly send your data from Firehose to Redshift.

Benefits and Learnings

  • Scalability: Each component of our solution is designed to scale with our data, ensuring that we can handle increasing volumes without a hitch.
  • Flexibility: By leveraging Lambda functions, we could easily modify our data processing logic as our requirements evolved.
  • Durability: With DynamoDB and S3, we ensured that our data was stored securely and reliably, with easy access for real-time and historical analysis

Conclusion

Building a data processing pipeline using AWS services like Kinesis Data Streams, Lambda, DynamoDB, Firehose, and S3 can seem daunting at first. However, by breaking down the architecture into manageable components and understanding how each service contributes to the overall solution, it becomes a powerful way to handle big data workloads.

I hope this walkthrough inspires you to leverage these AWS services in your projects and opens up new possibilities for data processing and analysis. Remember, the key to a successful implementation is understanding your data and how you intend to use it, then choosing the right tools for the job.

--

--