An End to End Data Engineering Project Tutorial: Creating a Data Funnel using AWS Kinesis ,Python and S3 Bucket

Amos Eda
13 min readOct 1, 2023

--

In this age of digital information , there is no lack of data . Information is flowing everywhere and as a business entity its important to make sure that this information is organized . Lets take for example , ecommerce. Let’s consider a use case of a business owner who wants to check trend in sales for summer vs Fall.

To create this captivating visualization, a wealth of data must undergo extensive number crunching in the background. Data needs to be organized with timestamps. It’s essential to organize the data with timestamps; otherwise, working with it would be a time-consuming and challenging task.

Keeping data organised can help us to be more precise and selective , helps us to drill down specific infromation that we want to drill down to when we want to derive insghts.

Organised folders based on time stamps

How can we organise this data when there is real time information streaming into your cloud infrastructure?
In this tutorial we are going to work on AWS architecture where we funnel down the information into different compartments based on timestamps to aid data analytics and visualization.

The Architecture

Architecture to drill down details

A quick exercise for you , the reader . Observe the above architecture , take a nice, good look at it and try to guess what each component is reponsible for . What would happen if a specific component is not present or what would happen if you add another element to the mix. There are different phases to this architecture, we will delve deep into each section and see how it contributes to data funneling.

(Optional) The inner working behind Architecture

Data Producer

Data producers are entities that send information to cloud infrastructure. These data producers could be something as simple as IOT devices thare are sending data from various nodes to cloud or even real time stock market data that getting sent through API hooks.
The data producers do not bother how the data is getting processed or if there would need to be a delay between each packet( information that is getting sent ) to avoid concurrency issues .A good analogy for this would be coal miners who are working in coal mines to gather coal . From their perspective it is irrelevant what gets done with the coal , their primary focus is to make sure that the coal is getting dumped into the wagon.

Kinesis Data Streams

Data Streams is akin to a wagon/train, it is responsible for gathering all the data that is getting dumped on their doorstep . Once the data gets dumped in a data stream , it arranges the information into a queue, where the information waits to get picked up by another agent. A good analogy to demonstrate this would be a coal wagons that are getting transported out of the cave .

Kinesis Firehose and S3

Firehose is similar to vendors/contractors .These coal vendors have different contracts or agreements with government entities such as Electricity or Gas departments (Folders within S3 bucket) .Firehose is responsible to make sure that data gets delivered to appropriate folders/partitions within S3 bucket. Once the information is collected, it gets dumped into different compartments based on timestamps.

With that said, lets get started with the tutorial!
Note : Before starting the tuorial, it is assumed that you already have an AWS console account registered .If you havent done so, then please go to the following link and get yourself registered

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —

Getting Started

Step -1 : Download the json file from Repository

For this tutorial I have created a json data that consists of ecommerce products for a demo website. Download the file from the following github repository .

Step -2 : Creating credentials to enable AWS SDK

We first need to create credentials for AWS SDK . AWS SDK are custom AWS commands which you can use in your local code . But in order to do so , you need to tell AWS that it’s actually you that is using the AWS SDK and not some impostor . To do so , we generate credentials and the first step to do that is to create a user with relevant permission.
1) Login to you AWS online console and navigate to IAM Users

2)Create new User

3)Make sure to select the appropriate options as shown below

3)Review your information carefully and select create user

4)Once you create user , you should be able to see the following information

Store these details at an isolated location , perhaps in a text file or notepad . You will be using these details again at a later phase.

5)Return to the users dashboard of IAM .

You should be able to view the IAM user that you have created. Click on it

6)Open the Security credentials tab

7)Scroll down to the Access Keys section and click on create Access Key .

You will be using these Access Keys in your local code to get access to AWS services programmatically

8)Select Local code and click the check box acknowledging that you have understood what you are signing up for and click “Next”

9)Choose a tag key that you want to assosciate the access keys with

10)This should prompt you to access keys page where you will be able to view your access key and your secret access key

Make sure to save these details in a text file or a notepad, since we will be using them as part of our data producers code.

Step 3 — Python Script for Data Production

This part of the tutorial assumes that you have python installed as a prerquisite. If you dont , then please navigate to the following link and follow the instructions.

  1. Open Command Prompt and install boto3(AWS open source library)
pip install boto3

2.Create a new python file (With .py extension) . Preferably using visual studio code .

3.In the python file , import the relevant libraries

import boto3
import json
import time

4.Instantiate AWS related variables

aws_access_key_id = <your access key that you stored from earlier>
aws_secret_access_key = <your secret access key>
aws_region = 'us-east-2'

In case you are clueless about your AWS region , you can easily check it in your AWS console

5.Keeping everything in the same folder

Make sure that the `ecommerce_data.json` that you downloaded from github earlier exists in the same file location as the python file .

6.Load the json data

with open('ecommerce_data.json', 'r') as json_file:
product_data = json.load(json_file)

Step 4 — Kinesis

In this phase we are going to configure Kinesis data streams , ensure that the data from the data producers is getting tunneled to intended destination

  1. Within the same file , initialize Kinesis client
kinesis_client = boto3.client('kinesis', region_name=aws_region, aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)

2) Create a new stream on Kinesis programmatically

stream_name = 'data-streamer'
shard_count = 1 # You can adjust the number of shards as needed

try:
kinesis_client.create_stream(
StreamName=stream_name,
ShardCount=shard_count
)
print(f"Stream '{stream_name}' created successfully with {shard_count} shard(s).")
except kinesis_client.exceptions.ResourceInUseException:
print(f"Stream '{stream_name}' already exists.")
except Exception as e:
print(f"Error creating stream: {str(e)}")

3) Add each product into the stream

# Send each product record to the Kinesis stream
for product in product_data:
# Convert the product record to JSON
product_json = json.dumps(product)

# Send the JSON record to the stream
response = kinesis_client.put_record(
StreamName=stream_name,
Data=product_json,
PartitionKey=product['date'] # Use a unique identifier as the partition key
)

print(f"Record sent to Kinesis: {response['SequenceNumber']},{response['ShardId']}")

# To avoid sending records too quickly (e.g., for rate limiting), you can add a delay
time.sleep(0.1) # Sleep for 100 milliseconds between each record (adjust as needed)

Now we’re going to check if the data got properly inserted in our AWS console

4)After executing the above code , you should be seeing the following output

Copy one of the record transaction ID and shard ID as well .Example (the first record ID):
- 49644981891148681367608698114075671093958433950903828482
- shardId-000000000000

5) Go the AWS console and navigate to Amazon Kinesis Data Streams

6) Select Data Streams

7) Check the streams and click the one that you have created

8) Check the data viewer section

9) Select the shardID and record ID that you saved earlier

If everything is done right , you should be able to see the data that has been uploaded.

Step 5 — Creating S3 Bucket

In this phase we are going to configure S3 bucket and make sure its ready to catch the data getting thrown at it from Kinesis firehose.

  1. Navigate to AWS console and select S3

2) Create bucket

3) Type a unique name

4) And select “Create Bucket”

Step 6 — Kinesis Firehose

In this phase we will configure Kinesis Firehose , We will make sure that appropriate partition keys are set in place .This will ensure ease of compartmentalisation.

1)Navigate to Kinesis Firehose

2)Create new delivery stream

3)For source select Kinesis Data stream

4)For destination select S3

5)For source settings choose the data stream that you created earlier

6) For Destination settings -> S3 bucket ,select “Create”

7) Select the AWS isoft bucket that we created earlier, and click on “Choose”

8) Enable Dynamic Partitioning
Explanation(Optional read) :You might wonder , why this is important. We require dynamic partitioning to ensure that the information is dropped in different compartments . Take our current use case as an example . By ensuring that all the data is getting dropped into the right folders

We are ensuring that data analytics can happen with much more ease .
Another use case , Assume that you are a data analytics engineer and you were told to to check the sales trends for the month of October . It wouldn’t take too much effort for you to write a code that can check only those folders and select data from only those folders that are belonging to October. Bottom line , our life gets lot more easier with dynamic partitioning , especially in terms of organizing our data and retrieving relevant information .

Enable dynamic partition

9) Enable Inline json parsing

10) Choose keys for dynamic partitioning .
Explanation (Optional) : The following is sample json data

I would like to organize the data according to the date format . Hence I’m going to use that basis for dynamic partitioning.

Add date as key name

11) Adding S3 bucket prefix to navigate data to specific folders.
Explanation(Optional) : Since we would want the information to get dumped into a specific folder in raw_test . We would want to add that as prefix .

Type raw_test as prefix and click on Apply dynamic partition

12) You should be able to see the following information

13)For S3 bucket error output type the following folder name

You can name it however you would like . But make sure to distinguish between the main target folder and the error folder

14) Finally click on “Create Delivery Stream”

Finally

We have completed end-to-end implementation of Data Funel Architecture.

Lets start testing our new toy and see how its performing

End to End Testing

I have run the code snippet for data streaming on my python jupyter notebook

Lets check if the records are added into the data streams .I have navigated to my “data-streamer” in Kinesis . I have entered the sequence number of my first transaction

We can see that the data is getting properly reflected .
Now ,When you navigate to firehose , you should see error logs status like this.

Debugging error logs (Optional):
NOTE : Go through this step if you have having error logs and its unclear on what exactly caused those errors .
But don’t worry even if you are getting destination error logs ,you can always debug them in logs . How do we go to cloudwatch logs?

Once we navigate to cloudwatch , you should see log groups on the left panel

Select your firehose streaming service . Mines as following

At log streams , you should be able to see these two log stream . If you check “DestinationDelivery”, you should see logs for firehose funnel

Alright,you still with me? If you are then good job. You are doing great :D . BACK to the main course of action , lets check the S3 bucket

You should be able to see the following results

Within raw_test folder

Limitations

The only limitations to the project

  1. Operating within the limits of AWS free tier features
  2. As long as the data is consistent with format that you have mentioned in data streams (Dynamic partitioning option in firehose). You should be good to go

Conclusion

And that’s all folks! If you were able to follow all the instructions and you understood the purpose of each task , then congratulations! 🎉🎉

You just created your first Data Engineering Project using AWS tools !

One of the advantages with this architecture is that you can add as many data producers as you need(as long as you are operating within limitations I mentioned earlier) . No matter how many data producers you will add, your AWS architecture will be able to herd them all like a dog herding a sheep.

Hope this tutorial was informative . Please reach out if you have any questions or suggestions .Until then

To infinity and beyond…

--

--