Dynamic e-commerce product pricing adjustment using AWS Kinesis, Lambda(Python)& DynamoDB

Sid
CodeX
Published in
9 min readMay 10, 2024

Have you ever wondered how sometimes, when two people browsing the same product on the same website see two different prices?

This is quite a common scenario in the world of e-commerce where companies dynamically adjust product prices basis a wide range of attributes such as:

1: Location of the users — For eg: a marketing campaign targeting very specific cities or states in a country.

2: Their purchase history — For eg: Loyal/Frequent users with a high purchasing power.

3: Number of items in the cart — For eg: if the shopping cart value exceeds a certain threshold, then certain products can be sold at a discounted price.

There could be plethora of other criteria here , but i will keep it simple in this article.

The most important aspects to keep in mind while designing the solution to such use cases, meaning scenarios or use cases where the solution will have a direct impact on the business, are:

1: Reliability and Fault Tolerance — The dynamic pricing solution must be highly reliable, ensuring continuous operation without failure, even under varying and potentially extreme conditions.

2: Scalability and Performance — The solution must efficiently handle varying levels of user demand without degradation in performance.

3: Data Security and Privacy — When implementing dynamic pricing strategies, it is imperative to consider the security and privacy of user data.

4: Analytical Capabilities — The ability to analyze the effectiveness of different pricing strategies is essential for continuous improvement.

The below diagram pretty much explains what AWS services we are going to use in order to implement this use case.

Picture a scenario where the website click streams are being written into kinesis (Data Stream-1). This stream has details of the pages the users are browsing currently, the traffic source, what product_id are they looking at etc.

As soon as the data is ingested into this data stream, we will setup a lambda function which will listen on this data stream, read and parse this data, do a lookup against a dynamodb table for that specific user_id where we have stored the users recent most purchase history. If the user who is currently browsing a specific product has had some recent purchases and if they meet a certain condition (at least 2 orders in the recent past ) then we will give them a discount of 20% on the product they are currently looking at. Simple right?

The lambda function, after doing a look-up, will write the final output to a different Kinesis data stream (Data Stream-2) which will have the user_id,product_id and the discount percentage.

After this point, you can imagine another microservice which listens on Data Stream-2 which handles the part of showing the discount to the user on the front end in near real time.

Alright, so thats the flow. Now lets begin.

Step-1: Create AWS Kinesis Data streams

We will start by creating 2 Kinesis data streams. One for storing the click stream data simulating real world website/application & another one to store the final output.

Head over to Kinesis and click on Create data stream.

Give it a name i.e: user_click_streams, select the on-demand capacity mode and click Create data stream.

Similarly, create another data stream and call it user_product_adjusted_pricing

So by now you should have 2 kinesis data streams.

Step-2: Create a dynamodb table to store recent-most customer purchases & ingest data

Head over to dynamodb from your aws console and create a table called recent_purchase_history with a partition key set to user_id. This table will store the customers recent purchase history such as the user_id, total_orders and total_number_items.

Click Create table.

Now , execute the below python script from your local environment after running the aws configure from your local terminal. Since this python script interacts with your AWS Account, it will fail if you haven’t authenticated your local environment with your AWS Account.

import boto3

import pandas as pd

# Load CSV data

data = pd.read_csv(‘data/user_purchase_history.csv’)

# Connect to DynamoDB using boto3

dynamodb = boto3.resource(‘dynamodb’, region_name=’us-east-1')

table = dynamodb.Table(‘recent_purchase_history’)

# Helper function to convert data to a suitable format and insert into DynamoDB

def write_to_dynamodb(row):

try:

table.put_item(

Item={

‘user_id’: str(row[‘user_id’]),

‘total_orders’: int(row[‘total_orders’]),

‘total_items_purchased’: int(row[‘total_items_purchased’])

}

)

print(f”Inserted user_id {row[‘user_id’]}”)

except Exception as e:

print(f”Error inserting user_id {row[‘user_id’]}: {str(e)}”)

# Apply the function to each row in the dataframe

data.apply(write_to_dynamodb, axis=1)

By the end of the script execution, you should have a few users with purchase histories stored in the dynamo table.

Script Execution
Test dynamo table to see if the data is ingested

Step-3: Create IAM Role for Lambda function to consume from Kinesis and access DynamoDb table

We will now deploy a lambda function which will be triggered/invoked whenever data or events are pushed into Kinesis (user_click_streams).

Lets first create an IAM role for this lambda function.

Under IAM , select Roles from the left navigation and click Create role
Select Lambda from the Service drop down and click Next

For policies, select the AWSLambdaBasicExecutionRole and AmazonKinesisFullAccess policies and create the IAM Role.

Now head back to the same role in order to add another inline policy for dynamodb access.

Select the JSON tab and paste the below json by replacing the account-id with your AWS Account id.

{
“Version”: “2012–10–17”,
“Statement”: [
{
“Effect”: “Allow”,
“Action”: “dynamodb:GetItem”,
“Resource”: “arn:aws:dynamodb:us-east-1:{account-id}:table/recent_purchase_history”
}
]
}

Click Create policy

Below is what your IAM role should look like.

STEP-4: Deploy a lambda function to consume from Kinesis

The lambda function itself is quite simple. It checks if a given user(user_id) who is currently browsing the website for a specific product has had a recent purchase with at least 2 items or has a minimum of 2 orders in the dynamodb table.If yes, we offer a discount of 20% in real time on the retail price.

Now paste the below script for the lambda code.

import json
import base64
import logging
import re
import boto3
from decimal import Context, setcontext, ROUND_HALF_EVEN

logger = logging.getLogger()
logger.setLevel(logging.INFO)

setcontext(Context(rounding=ROUND_HALF_EVEN))
region_name = ‘us-east-1’

def lambda_handler(event, context):
kinesis_client = boto3.client(‘kinesis’, region_name=region_name)
dynamodb = boto3.resource(‘dynamodb’, region_name=region_name)
purchase_history_table = dynamodb.Table(‘recent_purchase_history’)
adjusted_pricing_stream_name = ‘user_product_adjusted_pricing’

record_count = 0
error_count = 0

for record in event[‘Records’]:
try:
payload = base64.b64decode(record[‘kinesis’][‘data’])
data_item = json.loads(payload)

user_id = data_item.get(‘user_id’)
event_type = data_item.get(‘event_type’)

if user_id and event_type == ‘product’:
# Extract product_id from URI if event type is ‘product’
match = re.search(r’/product/(\d+)’, data_item.get(‘uri’, ‘’))
if match:
product_id = match.group(1)
# Lookup recent purchase history for the user
response = purchase_history_table.get_item(Key={‘user_id’: str(user_id)})
if ‘Item’ in response:
total_orders = int(response[‘Item’][‘total_orders’])
total_items_purchased = int(response[‘Item’][‘total_items_purchased’])

if total_orders > 2 and total_items_purchased > 2:
# Prepare new payload with a discount
adjusted_pricing_data = {
‘user_id’: user_id,
‘product_id’: product_id,
‘discount’: ‘20%’
}

# Send to another Kinesis stream
kinesis_client.put_record(
StreamName=adjusted_pricing_stream_name,
Data=json.dumps(adjusted_pricing_data),
PartitionKey=str(user_id) # Using user_id as the partition key
)
record_count += 1
except Exception as e:
logger.error(f”Error processing record: {e}”)
error_count += 1

logger.info(f’Processed {record_count} records with {error_count} errors.’)
return {
‘statusCode’: 200,
‘body’: json.dumps(f’Processed {record_count} records with {error_count} errors.’)
}

Now select the configuration tab and edit the timeout to 15 minutes.

Finally, we set the trigger so that this lambda gets invoked for everytime the data(events) are written to the Kinesis data stream (user_click_streams).

Select kinesis from the dropdown and select user_click_streams as your data stream. Now click Add.

Here, we set the batch size to 100 which means the lambda will not be invoked for every event that arrives into the data stream. It will be invoked for every 100 events written to kinesis.

Thats it, we are almost done. Click on Deploy and all we need to do now is set these different components in motion.

Step-5: Write data into Kinesis

We write data into our kinesis data stream which will in turn invoke the lambda function.

For this we will read data from a CSV file (events.csv) using a simple python script and write it into the data stream user_click_streams.

The below script will read every row and write the data into user_click_streams at a random time interval between 5–10 seconds.

You can execute this code from your local system. But make sure you have executed aws configure from the command line and authenticated your local environment with your AWS account.

import json

import time

import random

import csv

import boto3

import logging

# Initialize logger

logging.basicConfig(level=logging.INFO)

logger = logging.getLogger(__name__)

def process_file(file_path, region_name=’us-east-1'):

“””Process local CSV file and send data to Kinesis.”””

kinesis_client = boto3.client(‘kinesis’, region_name=region_name)

try:

# Open the local CSV file

with open(file_path, ‘r’) as file_content:

csv_reader = csv.DictReader(file_content)

counter = 0

for row in csv_reader:

try:

# Convert row to JSON string

data_json = json.dumps(row)

# Send record to Kinesis Data Stream

response = kinesis_client.put_record(

StreamName=”user_click_streams”,

Data=data_json,

PartitionKey=str(row[‘session_id’])

)

counter += 1

if response[‘ResponseMetadata’][‘HTTPStatusCode’] != 200:

logger.error(‘Error sending message to Kinesis:’, response)

except Exception as e:

logger.error(f”Error processing record {row}: {e}”)

time.sleep(random.randint(0,1)) # Random sleep between 5 to 20 seconds

logger.info(f”Finished processing. Total records sent: {counter}”)

except Exception as e:

logger.error(f”Error opening file {file_path}: {e}”)

raise e

def main():

file_path = ‘data/events.csv’ # Ensure this path correctly points to your CSV file

process_file(file_path)

if __name__ == “__main__”:

main()

In a couple minutes, you should see the data in your kinesis data stream. Navigate to Kinesis, click on user_click_streams , select the data viewer tab and click on get records by selecting any shard from the drop-down.

This also means that your lambda will get invoked very soon. Head over to your lambda function, select the monitor tab and click on CloudWatchLogs.

Click on the log stream and you should be able to see the output logs.

Now, switch over to kinesis and select the data stream user_product_adjusted_pricing which is where we expect to see the user discounts.

PS — Since the lambda function only looks for event_type == ‘product’ and also since the data is written into kinesis every 10–20 seconds, it might take some time for the data to arrive in user_product_adjusted_pricing.

In case you run out of patience , you can set the time interval to 1 second in the python script to write data to kinesis every second ;-) . You will see the lambda output right away.

Congratulations, you have successfully implemented an end to end event driven pipeline using Lambda, Dynamo and Kinesis.

FULL SOURCE CODE : https://github.com/sidoncloud/aws-de-usecases/tree/main/aws-event-driven-pricing-adjustment

--

--

Sid
CodeX
Writer for

Passionate data expert & Udemy instructor with 20k+ students, helping startups scale and derive value from data.