Obtaining EMA20 Rubber Band Theory Values Using Web Scraping and Pandas (Part 2)

Sze Zhong LIM
Data And Beyond
Published in
10 min readSep 3, 2023

One of the criteria I use to plan my Take Profit (TP) for stocks is to use the EMA20 and check how far away the price is away from the EMA20 indicator. The theory is also know as The Rubber Band Theory which can be found here.

You may find Part 1 here. This part will focus more on the code and how I scraped. Do take note that this is for academic purposes only and to do your due diligence to check on the terms and conditions of relevant parties before any scraping is done.

First, ensure to import all the libraries. If you don’t have them, you may pip install them individually.

# Import libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from bs4 import BeautifulSoup as soup
from urllib.request import Request, urlopen

# Libraries for data_for_ticker function
import urllib.request
import io
from datetime import datetime, timedelta

#Libraries for styler
from IPython.display import display # For Spyder IDE

We then create the data_for_ticker function, which scrapes the information for one ticker, from Yahoo Finance. By default, I have set the ticker information to be by day, and to go all the way back to 40 years. This is to get the max record for the ticker. If you only need up to 5 years for instance, you could change the value accordingly.

def data_for_ticker(ticker,urlinterval="1d",yearsback=40,startdate="0"):
#startdate to be in format "dd-mm-yy"

# Get the unixtimestamp for the url
if startdate=="0":
current_datetime = datetime.now()
else:
try:
# Ensure startdate to be in format "dd-mm-yy" or it will be datetime.now()
current_datetime = datetime.strptime(startdate, "%d-%m-%y")
except Exception as e:
print("The datetime format is incorrect. The current date will be used.")
print(f"Error Type: {e}")
current_datetime = datetime.now()
else:
print(f"Date accepted as -> Day:{current_datetime.day}; Month:{current_datetime.strftime('%B')}; Year:{current_datetime.year}")
unix_timestamp1 = int(current_datetime.timestamp())
#print("Unix Timestamp:", unix_timestamp1)

# Get the unixtimestamp for the url start period. X years from now.
earlier_time = current_datetime - timedelta(days=365*yearsback)
unix_timestamp2 = int(earlier_time.timestamp())
#print("Unix Timestamp 40 years ago:", unix_timestamp2)

# Create the url
period_start = str(unix_timestamp2)
period_end = str(unix_timestamp1)
# 1 day = 1d; 1 week = 1wk; 1 month = 1mo. Default = 1d
interval_1d1w1mo = urlinterval
url = f"https://query1.finance.yahoo.com/v7/finance/download/{ticker}?period1={period_start}&period2={period_end}&interval={interval_1d1w1mo}&events=history&includeAdjustedClose=true"

# Send a GET request to download the CSV file
response = urllib.request.urlopen(url)
# Read the CSV file from the response content
csv_content = response.read().decode('utf-8')
# Create a DataFrame from the CSV content
df = pd.read_csv(io.StringIO(csv_content))

#Transform to datetime format and 2 decimal points for floats
# Can use print(df.dtypes) to find out which types.
df['Date'] = pd.to_datetime(df['Date'])
float_columns = df.select_dtypes(include=['float64']).columns
df[float_columns] = df[float_columns].round(2)
# Sort "Date" to be latest date first.
df = df.sort_values('Date', ascending=True)

return df

We then come out with a function to calculate the EMA value and append it into a column within that dataframe. I have done it such that we can have multiple EMA values instead of just 1 EMA value at any one time.

def calculate_and_add_ema(ema_period_list, df):
for period in ema_period_list:
column_name = f"EMA_{period}"
ema = df['Close'].astype(float).ewm(span=period, min_periods=period-1).mean()
ema = ema.astype(float).round(2)
df[column_name] = ema

The next function calculates the % difference between the price and the EMA line. Similar as the above function, it takes in a list of EMA values that we might want it to compute.

def percentage_diff(ema_period_list, df):
for period in ema_period_list:
ema_column_name = f"EMA_{period}"
column_name_high = f"EMA_{period}_high_per_diff"
column_name_low = f"EMA_{period}_low_per_diff"
df[column_name_high] = round((df["High"]-df[ema_column_name])/df[ema_column_name]*100,2)
df[column_name_low] = round((df["Low"]-df[ema_column_name])/df[ema_column_name]*100,2)

From the 3 functions above, we are able to get the % difference data for 1 ticker. We can have a preliminary try on the 3 functions above to see what kind of dataframe we can get.

df = data_for_ticker("AAPL",urlinterval="1d",yearsback=40,startdate="0")
EMA_LIST = [20]
calculate_and_add_ema(EMA_LIST,df)
percentage_diff(EMA_LIST,df)
display(df)

We can also use the below code to first filter the dataframe for the year 2022, and only include values which are > 0. After filtering, we can check out the distribution of the data.

# Filter for year 2022 and ignore negative % values
year_2022_high = df[
(df["Date"].dt.year == 2022) &
(df["EMA_20_high_per_diff"] > 0)]

# Create a histogram
plt.hist(year_2022_high["EMA_20_high_per_diff"], bins=20, color='blue', alpha=0.7)
plt.xlabel("High Difference Percentage")
plt.ylabel("Frequency")
plt.title("Distribution of Difference Percentages Between EMA20 and Price for Year 2022")
plt.show()

From the output above, we can see that for this ticker “AAPL”, in the year 2022, majority of the high % difference happens around 0–3%. To get the rubber band theory, we are interested to get the 80th percentile values (for our case). This is because we are interested to know the stretching limit of the rubber band.

We can improvise the previous code to include an additional filter to only take the 80th percentile values

# Filter top 80% of high difference percentages for year 2022
year_2022_high_80 = df[
(df["Date"].dt.year == 2022) &
(df["EMA_20_high_per_diff"] > 0)&
(df["EMA_20_high_per_diff"] > df["EMA_20_high_per_diff"].quantile(0.8))]

# Create a histogram
plt.hist(year_2022_high_80["EMA_20_high_per_diff"], bins=20, color='blue', alpha=0.7)
plt.xlabel("High Difference Percentage")
plt.ylabel("Frequency")
plt.title("Distribution of Difference Percentages Between EMA20 and Price for Year 2022 (Top 80%)")
plt.show()

As can be seen, the 80th percentile of values are from 6.5% onwards. However, if we want to consider the number of incidents, it would be better to consolidate the readings. What do i mean? I will give an example.

  • 1/March/2022 = 6.5%
  • 2/March/2022 = 7%
  • 3/March/2022 = 8%
  • 4/March/2022 = 6.5%
  • 5/March/2022 = 7%

I would consider the above as 1 event or 1 peak at 8%. This is in comparison with:

  • 1/March/2022 = 6.5%
  • 1/April/2022 = 7%
  • 1/May/2022 = 8%
  • 1/July/2022 = 6.5%
  • 1/September/2022 = 7%

The case above has 5 incidence of peaks, as compared to 1 incidence of peaks. We are interested to consolidate the peak and categorize them into instances because we would like to know the maximum stretch with regards to each incident, and not only the distribution. Put it this way, with the same distribution of data, it is possible to have:

  • Case A: 5 incidents of peaks at 8+%
  • Case B: 5 incidents of different peaks at 6%, 6%, 8+%, 8+%, 8+%.
  • Case C: 5 incidents of different peaks at 7%, 7%, 7%, 7%, 8+%.

We are more interested to get the MEDIAN of the incidents (instead of the median of the 80th percentile and above) to get a more appropriate estimation of the rubber band theory.

As such, we have to code the consolidation part. We will be using 5 days before and after the actual date to determine the peak of that period. The function takes in 2 different DataFrames. The original DataFrame, where the index of 5 days before and after will be checked; the filtered DataFrame, where the 80th percentile values are consolidated together with their index numbers from the original DF. We have also created a list to prevent any re-checking of the same figures.

def consolidated_high(filtered_df, original_df):
listofdates = []
listofvalues = []
newdict ={}
listofidxchecked=[]
for idx1, row1 in filtered_df.iterrows():
if idx1 not in listofidxchecked:
listofidxchecked.append(idx1)
temp_high = row1['EMA_20_high_per_diff']
temp_date = row1['Date']

# Cases where idx cannot -5 or +5
if idx1 < 5:
idx2min=0
else:
idx2min = idx1 - 5
idxlast = original_df.index[-1]
if idx1 > idxlast - 5:
idx2max = idxlast
else:
idx2max = idx1 + 5

for idx2 in range(idx2min,idx2max):
if idx2 not in listofidxchecked:
listofidxchecked.append(idx2)
if original_df.iloc[idx2]['EMA_20_high_per_diff'] <= temp_high:
pass
else:
temp_high = original_df.iloc[idx2]['EMA_20_high_per_diff']
temp_date = original_df.iloc[idx2]['Date']
listofdates.append(temp_date)
listofvalues.append(temp_high)
newdict['Dates']=listofdates
newdict['Values']=listofvalues
consolidated_df = pd.DataFrame.from_dict(newdict)
consolidated_df['Type'] = "Above_EMA"
return consolidated_df
def consolidated_low(filtered_df, original_df):
listofdates = []
listofvalues = []
newdict ={}
listofidxchecked=[]
for idx1, row1 in filtered_df.iterrows():
if idx1 not in listofidxchecked:
listofidxchecked.append(idx1)
temp_low = row1['EMA_20_low_per_diff']
temp_date = row1['Date']

# Cases where idx cannot -5 or +5
if idx1 < 5:
idx2min=0
else:
idx2min = idx1 - 5
idxlast = original_df.index[-1]
if idx1 > idxlast - 5:
idx2max = idxlast
else:
idx2max = idx1 + 5

for idx2 in range(idx2min,idx2max):
if idx2 not in listofidxchecked:
listofidxchecked.append(idx2)
if original_df.iloc[idx2]['EMA_20_low_per_diff'] >= temp_low:
pass
else:
temp_low = original_df.iloc[idx2]['EMA_20_low_per_diff']
temp_date = original_df.iloc[idx2]['Date']
listofdates.append(temp_date)
listofvalues.append(temp_low)
newdict['Dates']=listofdates
newdict['Values']=listofvalues
consolidated_df = pd.DataFrame.from_dict(newdict)
consolidated_df['Type'] = "Below_EMA"
return consolidated_df

One of the consolidation is for the low below the EMA, and one is for the high above the EMA. No doubt i could have coded it more efficiently such that it is combined into one function instead of two. But i felt this would provide more clarity and easier to check back.

We will then proceed with the filter function itself which will utilize this consolidation function. I have provided some configurations such as median parameter and quantile parameter within, for future adjustment purposes.

def filtered_median_quantile_min_max_year_df_high(original_df, column_to_filter, year, quantilepara=0.8, medianpara=0.5):
# Create Filter for the DF
# To get Median for whole year
filtered_df_for_median = original_df[
(original_df["Date"].dt.year == year) &
(original_df[column_to_filter] > 0) # Only positive percentage values
]

# To get Max, Min, Median for Quantile only.
filtered_df = original_df[
(original_df["Date"].dt.year == year) &
(original_df[column_to_filter] > 0) & # Only positive percentage values
(original_df[column_to_filter] > original_df[column_to_filter].quantile(quantilepara))
]

temp_df = consolidated_high(filtered_df, original_df)
quantile_min = temp_df['Values'].min()
quantile_max = temp_df['Values'].max()
quantile_median = round(temp_df['Values'].median(),2)

overall_median = round(filtered_df_for_median[column_to_filter].quantile(medianpara),2)
return overall_median, quantile_min, quantile_max, quantile_median
def filtered_median_quantile_min_max_year_df_low(original_df, column_to_filter, year, quantilepara=0.8, medianpara=0.5):
# Create Filter for the DF
# To get Median for whole year
filtered_df_for_median = original_df[
(original_df["Date"].dt.year == year) &
(original_df[column_to_filter] < 0) # Only positive percentage values
]

# To get Max, Min, Median for Quantile only.
filtered_df = original_df[
(original_df["Date"].dt.year == year) &
(original_df[column_to_filter] < 0) & # Only positive percentage values
(original_df[column_to_filter] < original_df[column_to_filter].quantile(1-quantilepara))
]

temp_df = consolidated_low(filtered_df, original_df)
quantile_min = temp_df['Values'].min()
quantile_max = temp_df['Values'].max()
quantile_median = round(temp_df['Values'].median(),2)

overall_median = round(filtered_df_for_median[column_to_filter].quantile(medianpara),2)
return overall_median, quantile_max, quantile_min, quantile_median

The above is basically how the computation for each ticker will work. You may use the code below to check it out.

a,b,c,d = filtered_median_quantile_min_max_year_df_high(df, "EMA_20_high_per_diff", 2022, quantilepara=0.8, medianpara=0.5)
print(f"Overall Median = {a}")
print(f"Quantile Max = {b}")
print(f"Quantile Min = {c}")
print(f"Quantile Median = {d}")

To consolidate everything, we will provide a function which takes an input of a list of tickers and comes out with a DataFrame that shows a summary of the parameter we require (Median / Max / Min etc).

# This function will create the final dataframe.
def final_df_tickers_perc(lstoftickers, ema_list, yearlist, quantileparameter=0.8):
qname = str(quantileparameter*100)
# Create the result df format
column_names = ["Ticker", "Year", "Type", "Overall_Median",f"{qname}th_PCTL_Min", f"{qname}th_PCTL_Max", f"{qname}th_PCTL_Median"]
result_df = pd.DataFrame(columns=column_names)

# Create a for loop to loop over the different tickers.
for ticker in lstoftickers:
df =data_for_ticker(ticker,urlinterval="1d",yearsback=40,startdate="0")
calculate_and_add_ema(EMA_LIST,df)
percentage_diff(EMA_LIST,df)

# Create a for loop to loop over the different years.
# This portion not curated for multiple EMA_List. Will improve when have time.
for year in yearlist:
if year in df["Date"].dt.year.values:
a,b,c,d = filtered_median_quantile_min_max_year_df_high(df,"EMA_20_high_per_diff",year, quantilepara=quantileparameter)
e,f,g,h = filtered_median_quantile_min_max_year_df_low(df,"EMA_20_high_per_diff",year, quantilepara=quantileparameter)
newlistabove = [ticker, year, "Above EMA", a, b, c, d]
newlistbelow = [ticker, year, "Below EMA", e, f, g, h]
result_df.loc[len(result_df)] = newlistabove
result_df.loc[len(result_df)] = newlistbelow
else:
print(f"Year {year} is not available for {ticker}")
pass

return result_df

You may check out the above function works, with the code below:

listofticker = ["TSLA","AAPL"]
EMA_LIST = [20]
listofyears = [2023]
new_df = final_df_tickers_perc(listofticker, EMA_LIST, listofyears)
display(new_df)

I would like to get the whole list of tickers which are Mid Cap and above. You may choose to scrape it from any website that allows. For academic purposes, I will just be showing the method of how to scrape from Finviz. Do check out their website terms and conditions on scraping. The code below is just a sample of how it could be done for other websites that have no rules against scraping.

# To scrape for the list of tickers (Mid Cap and Above)
def scrape_overall_list(baseurl):
start_index = 1
start_page = 1
combined_table = None

# To get total number of rows for the baseurl
counter_req = Request(baseurl, headers={'User-Agent': 'Mozilla/5.0'})
counter_webpage = urlopen(counter_req).read()
counter_html = soup(counter_webpage, "html.parser")
counter_text = counter_html.find(class_= "count-text").text
counter_number = counter_text.split("/")[1].split()[0]
counter_pages = int(counter_number)//20 + 1

print("Scraping for info:")
while start_index <= int(counter_number):
url = baseurl + '&r=' + str(start_index)
req = Request(url, headers={'User-Agent': 'Mozilla/5.0'})
webpage = urlopen(req).read()
html = soup(webpage, "html.parser")

table = pd.read_html(str(html), attrs = {'class': 'table-light'})[0]

if combined_table is None:
combined_table = table.iloc[1:]
combined_table = combined_table.set_axis(table.iloc[0], axis = 'columns',inplace = False)
else:
table.columns = combined_table.columns
combined_table = pd.concat([combined_table, table.iloc[1:]], ignore_index=True)

start_index += 20

print(f"Page {start_page} / {counter_pages} processed")
start_page += 1

## Transform the list
# Remove 'B' from Market Cap column and convert it to float with 3 decimal places
combined_table['Market Cap'] = combined_table['Market Cap'].str.replace('B', '').astype(float).round(3)
# Remove the percentage symbol from Change column and convert it to float with 2 decimal places
combined_table['Change'] = combined_table['Change'].str.replace('%', '').astype(float).round(2)
# Convert P/E, Price, and Change columns to float with 2 decimal places
columns_to_convert = ['P/E', 'Price']
combined_table[columns_to_convert] = combined_table[columns_to_convert].replace('-', '0').astype(float).round(2)
# Convert Volume and No. columns to integer
combined_table['Volume'] = combined_table['Volume'].astype(int)
combined_table['No.'] = combined_table['No.'].astype(int)

return combined_table

We can use the code below to run accordingly, and save the output DataFrame into a csv file for our use.

# Get the list of tickers
baseurl = 'https://finviz.com/screener.ashx?v=111&f=cap_midover&o=marketcap'
lstofticker = scrape_overall_list(baseurl)['Ticker'].sort_values()

# Set the EMA List and list of years.
EMA_LIST = [20]
listofyears = [2021,2022,2023]

# Come out with the final dataframe of tickers percentage
wanted_df = final_df_tickers_perc(lstofticker, EMA_LIST, listofyears)

# Save DataFrame into a CSV File with Date Time file name as record
current_datetime = datetime.now()
formatted_datetime = current_datetime.strftime("%Y%m%d_%H%M%S") # Format: YYYYMMDD_HHMMSS
file_name = f"{formatted_datetime}_EMA_Rubber_Band.csv"
wanted_df.to_csv(file_name)
print(f"DataFrame saved to {file_name}")

Should there be any question or comments on how I could have better improved, do feel free to contact me.

--

--