AWS Serverless workflow for monitoring Share Market fluctuations.

Manish singh
9 min readJun 18, 2023

--

I warmly welcome you to my first blog post (hopefully not my last..) on Medium. I will try to make it short, informative, and engaging. Forgive me if I miss any one of what I just said or all of them, I’m just getting started. So, let’s begin.

Content Table:-

1. Back story

2. Overview

3. AWS Architecture for the project.

4. Trigger First Lambda Function:

Purpose: To extract data from Yahoo Finance and store it in DynamoDB.

Trigger: CloudWatch event and SQS (Simple Queue Service).

5. Trigger second Lambda Function :

Purpose: To calculate the change in stock prices.

Trigger: when new data is stored in DyanmoDB.

6. Email Notification:

Purpose: To notify the user about the change in stock prices.

Trigger: Calculated price change is above the specified threshold value.

1. Back story

I wanted to do a project after completion of my AWS cloud practitioner exam (you know certification is not enough!) so, I started searching for project inspiration that aligned with my passion for the Data field and AWS cloud.

I came across a youtube channel and a couple of medium blogs, where they were talking about how we can leverage the power of AWS cloud automation to automate the continuous monitoring of Stock and Crypto prices.

2. Overview

Market volatility is inevitable and it’s very difficult to time the market. I built a serverless architecture that sends an alert message or an email when there are price movements on my favorite stocks or crypto. The need for the tediously continuous monitoring of the market becomes obsolete in this case.

In this project, I developed a data pipeline where live data will be collected from Yahoo Finance for crypto and stock prices, based on the Ticker (short stocks name) for a required time interval, and compare the latest data with the previous recent data for changes in the price. If there is a change in the price of a certain percentage then an email is sent to the user.

3. AWS Architecture for the project.

The architecture diagram is created in the draw.io

Multiple AWS services are used to build this serverless system on the cloud. These services include AWS Lambda, DynamoDB, SQS (message queuing service), SES (Simple Email Service), CloudWatch events, IAM(Identity Access Management), & S3 bucket.

What are these services you might think?, don’t worry I got you covered

AWS Lambda: Serverless computing service that allows you to run your code without provisioning or managing servers.

DynamoDB: Fully managed NoSQL database service for applications that need low latency, scalable, and consistent performance.

SQS: Message queuing service that decouples and scales microservices, distributed systems, and serverless applications.

SES: Email sending and receiving service to send transactional emails, marketing campaigns, and notifications.

CloudWatch Events: Event-driven service that monitors and responds to changes in AWS resources or custom events in real time.

IAM: Service for securely managing access to AWS resources by creating and managing users, groups, and permissions.

S3: Object storage service for storing and retrieving any amount of data from anywhere on the web.

4 . Trigger First Lambda Function

Let’s say you have some stocks in mind to monitor ( in my case, Google, TCS, & Reliance).

We have scheduled a CoudWatch Event (let’s say 5 min), which means with every five minutes, the CloudWatch Event will notify SQS (A queuing service) to trigger a Lambda function (a serverless function).

CloudWatch Event Scheduled at 5 min, to trigger SQS.

As, the 5 min hit, the SQS gets notified to trigger the lambda function. As we can see in the lambda function dashboard, SQS is added as a trigger elements

The SQS will trigger the Lambda function (named stock_api)

In the Lambda function, I have written a Python code and uploaded a Zip file to the Layers, in which the necessary library package is present to run the code, such as boto3, yahoo_fin, time, json, & os.

The lambda function layer is used to hold the necessary library.

As the lambda function triggered, this runs the Python code and extracts the live data from Yahoo Finance for specified tickers (tickers are short forms for Stocks name)

The Python code I wrote is with comments, on why I used a particular function or import library

import boto3  # Import the boto3 library to interact with AWS services
import time # Import the time library to get the current time
import decimal # Import the decimal library for decimal arithmetic
from yahoo_fin.stock_info import * # Import yahoo_fin.stock_info library for stock price retrieval

dynamodb = boto3.resource('dynamodb') # Create a connection to DynamoDB
table_name = 'Ticker_table' # name of your DynamoDB table to store the data
table = dynamodb.Table(table_name) # Connect to the DynamoDB table
def lambda_handler(event, context): # Define the AWS Lambda function
tickers = ['TCS.NS', 'RELIANCE.NS', 'GOOG'] # List of stock tickers to fetch data for
fav_stocks = {} # Create an empty dictionary to store the stock prices
for symbol in tickers: # Loop through the list of tickers
price = get_live_price(symbol) # Get the current price for each stock
fav_stocks[symbol] = round(price, 2) # Store the price in the dictionary, rounded to two decimal.
ddb_data = json.loads(json.dumps(fav_stocks),parse_float=decimal.Decimal)
# Convert the prices to Decimal values for storage in DynamoDB
for key in ddb_data: # Loop through the dictionary of stock prices
response = table.put_item( # Insert the data into DynamoDB
Item={
'ticker': key, # The stock ticker
'timestamp': int(round(time.time() * 1000)), # The current timestamp in milliseconds
'price': ddb_data[key] # The current price of the stock
}
)
return {
'statusCode': 200, # Return a success status code
'body': 'Data stored in DynamoDB' # Return a success message
}

Those latest data values (which the Lambda function extracted) will be stored every 5 min in the DyanmoDB (a NoSQL database offered by AWS). The values for our Stocks ticker will be visible to us in tabular form.

Tabular data representation of stock prices, stored in DyanmoDB

5. Trigger second Lambda Function

When there is new data (or the latest data values I say) stored in the DyanmoDB, it will trigger a second Lambda function. In this Lambda function, I have written the Python code to do the calculation for finding the percentage of change and compare it to the specified threshold percentage.

The DyanmoDB will trigger the Lambda function (named stream)

As you can see in the Layer, this time I haven’t uploaded any Zip files of library packages. Because the calculation is simple and made up of inbuilt functions for Python.

The Python code in this lambda function will take out the data of the latest prices for the specified stocks & crypto and the previous prices of the same stocks & crypto. And do the calculation to see whether the percentage of price change is beyond the specified threshold.

import json
import boto3
from boto3.dynamodb.conditions import Key
import os

table_name = os.environ['table_name'] # get table name from environment variables
SENDER = os.environ['sender'] # get sender email from environment variables
RECIPIENT = os.environ['recipient'] # get recipient email from environment variables
AWS_REGION = os.environ['region'] # get AWS region from environment variables
percent_change = int(os.environ['percent_change']) # get percent change threshold from environment variables
def find_volatility(values):
"""function to compare the prices
Returns
------
Percentage of Volatility
"""
print('I am here in volatile--->')
item_values = values[:2]
ticker = item_values[0]['ticker']
volatile_values = []
for item in item_values:
value = float(item['price'])
volatile_values.append(value)
message = None # initialize tweet variable to None
if len(volatile_values) == 1:
pass
else:
if volatile_values[0] > volatile_values[1]:
increase = volatile_values[0] - volatile_values[1]
increase_percent = int((increase / volatile_values[1]) * 100)
message = "There is volatility in the market. The price of " + ticker + " has rised by " + str(
increase) + "%" + " with current price " + str(volatile_values[0])
elif volatile_values[0] == volatile_values[1]:
pass
else:
decrease = volatile_values[1] - volatile_values[0]
decrease_percent = int((decrease / volatile_values[1]) * 100)
message = "There is volatility in the market. The price of " + ticker + " has dropped by " + str(
decrease) + "%" + " with current price " + str(volatile_values[0])
if message:
return message
else:
pass
def Email(values):
if len(values) > 1:
mail = find_volatility(values) # call find_volatility function to check for volatility
else:
mail = None
ses = boto3.client('ses', region_name=AWS_REGION) # create an SES client for sending email
CHARSET = "UTF-8"
SUBJECT = "Serverless-Workflow for Stocks and Crypto Volatility"
BODY = """
Hello,
Here's an update on your favorite Stock/Crypto price movements:
%s
""" % mail if mail else "No volatility detected." # format email body to include volatility message or default message
x=0
for item in values:
if x <2:
ticker = item["tciker"]
price = item["price"]
timestamp = item["timestamp"]
BODY += f"{ticker} at {price} as of {timestamp}\n\t" # add ticker, price and timestamp information to email body
x+=1
else:
break
BODY += """
Regards,
Manish
"""
try:
response = ses.send_email(
Destination={
'ToAddresses': [
RECIPIENT, # set recipient email
],
},
Message={
'Body': {
'Text': {
'Charset': CHARSET,
'Data': BODY, # set email body
},
},
'Subject': {
'Charset': CHARSET,
'Data': SUBJECT, # set email subject
},
},
Source=SENDER, # set sender email
)
except Exception as e:
print(e)
def lambda_handler(event, context):
for record in event['Records']:
if record['eventName'] == "INSERT": # check if new record was inserted
newImage = record["dynamodb"]["NewImage"]
ticker = newImage["ticker"]["S"]
newTickerPrice = newImage["price"]["N"]
timestamp = newImage["timestamp"]["N"]
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table(table_name) # get DynamoDB table object
response = table.query(
KeyConditionExpression=Key('ticker').eq(ticker), # query for items with the same ticker
ScanIndexForward=False
)
values = response['Items'] # get list of items with the same ticker
Email(values) # send email with price information
User input is fed to the environment variable, instead of directly into the code for data privacy

6. Email Notification

If the rate of the price change, is below the threshold percentage(which is 1%), then nothing will happen and the cycle will run again after 5 minutes.

But, if the rate of change is higher than 1%, then AWS SES automatically sends an Email to the user regarding the change in prices and how much percentage of the change in prices occurs.

The mail I received when the change in price is higher than the threshold

Conclusion

So, this is the end of this project, hope this blog gives you a proper understanding of how serverless architecture works for the automation of manual monitoring of shares & crypto prices.

To glue all the services together in AWS, we need authentication & authorization, which is taken care of by IAM roles. IAM roles grant access to all these services to utilize one & another, to build a whole architecture. If we do not use the IAM role & policies, we might not be able to surpass even the first step, which is notifying SQS services to trigger the 1st lambda function.

If you want to do some cloud-based projects, I would recommend you try AWS free tier, it is free (not completely obviously), with some limitations. But, good for making projects.

References & Resources

[1]. Workflow for Crypto and Stock price volatility on AWS Cloud

[2]. AWS SQS + Lambda Setup Tutorial — Step by Step

[3]. Top 5 Use Cases For AWS Lambda

As this is my first experience writing a blog, please take a moment to share your valuable feedback, which will assist me in identifying the areas where I need to focus on for improvement.

Feel free to connect-

LinkedIn — Manish

Till then stay consistent & be happy!

--

--