Markov Chains — Detailed Exploration
Markov chains are nice example and root of many possibility related technologies. They are not evolved and best system yet its core idea is very simple and we use it everyday in our lives or see it without conciusness.
It is actually a probabilistic circulation between categorical values. We can determine a space and set probabilities such as eating habits or sunny, warm days or anything else
It could be very complex or simple. We can use it on actions such as observable states with reinforcement learning or stock prices. Mostly games could be modeled with markov chains or next word prediction similarly such cases.
So I prepared a categorical prediction transformation for Markov Chains
import random
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import mean_squared_error
from sklearn.linear_model import Ridge
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeRegressor
import pandas as pd
class MarkovChainCreator:
def __init__(self, states):
self.states = states
self.state_number = len(states)
self.state_possibilities = {}
x = random.randint(250, 300)
self.scores = [x-50*_ for _ in range(len(states))]
def assign_train(self):
self.train_size = int(len(self.numericData) * 0.8)
self.train, self.test = self.numericData[:self.train_size], self.numericData[self.train_size:]
def assign_possibilities(self):
probabilities = {}
min_probability = 1 / len(self.states) # Set a small non-zero minimum probability
for i in range(self.state_number):
current_possibilities = []
# Ensure that the first probability is positive and not too high
first_probability = random.uniform(min_probability, 1 - min_probability * (self.state_number - 2))
current_possibilities.append(first_probability)
for j in range(1, self.state_number):
remaining_probability = 1 - sum(current_possibilities)
if j == self.state_number - 1:
# Ensure the last probability is positive and completes the sum to 1
current_possibilities.append(max(0, 1 - sum(current_possibilities)))
else:
# Ensure the other probabilities are positive and do not exceed the remaining space
max_remaining_prob = min(remaining_probability, 1 - min_probability)
current_possibilities.append(random.uniform(min_probability, max_remaining_prob))
probabilities[self.states[i]] = [p/sum(current_possibilities) for p in current_possibilities]
self.state_possibilities = probabilities
def show_possibilities(self):
return self.state_possibilities
def create_chain(self, n):
chainList = []
state_next = random.choice(self.states)
chainList.append(state_next)
for _ in range(n):
choice = random.choices(population=self.states, weights=self.state_possibilities[state_next])
state_next = choice[0]
chainList.append(state_next)
self.chainList = chainList
return self.chainList
def countDictScores(self):
self.countsScores = {}
for state in self.states:
self.countsScores[state] = {'counts:':self.chainList.count(state),'scores:':self.scores[states.index(state)]}
return self.countsScores
def transform_numeric(self):
numericData = []
for i in range(len(self.chainList)):
score = 0
for j in range(1, i + 1):
score += self.scores[self.states.index(self.chainList[i])]
numericData.append(score)
self.numericData = numericData
self.assign_train()
def transform_predictions(self):
predData = []
for i in range(len(self.predictions)):
score = 0
for j in range(1, i + 1):
score += self.scores[self.states.index(self.predictions[i])]
predData.append(score)
self.predictionNumericData = predData
def get_average_predictor(self):
self.avg_data = []
for i in range(len(self.numericData)):
scores = [0 for i in range(len(self.scores))]
for j in range(1, i + 1):
for j in range(len(self.scores)):
if j != len(self.scores)-1:
a = self.scores[j]
b = self.scores[j+1]
scores[j] += (a+b)/2
self.avg_data.append(scores)
def recursive_state_selection(self, remaining_states, remaining_probs,max_prob_val,sum_other_probs):
if not remaining_states:
return None
max_prob_index = remaining_probs.index(max(remaining_probs))
selected = remaining_states[max_prob_index]
max_prob = remaining_probs[max_prob_index]
max_prob = sum_other_probs*max_prob
max_prob_val = max_prob_val*max_prob_val
if max_prob >= max_prob_val:
return selected
else:
remaining_states.pop(max_prob_index)
remaining_probs.pop(max_prob_index)
return self.recursive_state_selection(remaining_states, remaining_probs,max_prob_val,max_prob)
def predictor(self):
self.predictions = []
self.get_average_predictor()
for i in range(len(self.numericData)):
val = self.chainList[i - 1]
next_state_index = self.state_possibilities[val].index(max(self.state_possibilities[val]))
selected = self.states[next_state_index]
remaining_states = [state for state in self.states if state != selected]
remaining_probs = [self.state_possibilities[val][self.states.index(state)] for state in remaining_states]
max_prob = self.state_possibilities[self.states[next_state_index]][next_state_index]
sum_other_probs = sum(remaining_probs)
selected_other = None
if sum_other_probs > max_prob:
selected_other = self.recursive_state_selection(remaining_states, remaining_probs,max_prob,sum_other_probs)
if selected_other is not None:
diff_new = (self.numericData[i - 1] - self.avg_data[i - 1][self.states.index(selected_other)]) * self.state_possibilities[selected_other][next_state_index]
if i == 1:
pass
elif diff_new >= min(self.numericData[0:i - 1]):
selected = selected_other
else:
pass
self.predictions.append(selected)
self.transform_predictions()
def markovPredictor(self):
self.markovPredictions = []
for i in range(len(self.chainList)):
val = self.chainList[i-1]
next_state_index = self.state_possibilities[val].index(max(self.state_possibilities[val]))
next_state = self.states[next_state_index]
self.markovPredictions.append(next_state)
def fit_ridge_regression(self):
# Transform the data into the required format
X = np.arange(len(self.chainList)).reshape(-1, 1)
y = np.array(self.numericData)
# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Ridge Regression model
alpha = 0.01 # Regularization strength (adjust as needed)
ridge_reg = Ridge(alpha=alpha)
ridge_reg.fit(X_train, y_train)
# Predictions on the test set
y_pred = ridge_reg.predict(X_test)
# Evaluate the model
mse = mean_squared_error(y_test, y_pred)
print(f'Mean Squared Error (Ridge Regression): {mse}')
# Plot the data and the regression line
plt.scatter(X, y, color='blue', label='Original data')
plt.plot(X_test, y_pred, color='red', linewidth=3, label='Ridge Regression')
plt.xlabel('Index')
plt.ylabel('Value')
plt.legend()
plt.show()
return y_pred
def fit_decision_tree(self):
# Transform the data into the required format
X = np.arange(len(self.chainList)).reshape(-1, 1)
y = np.array(self.numericData)
# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Decision Tree model
max_depth = 5 # Adjust the depth as needed
decision_tree = DecisionTreeRegressor(max_depth=max_depth)
decision_tree.fit(X_train, y_train)
# Predictions on the test set
y_pred = decision_tree.predict(X_test)
# Evaluate the model
mse = mean_squared_error(y_test, y_pred)
print(f'Mean Squared Error (Decision Tree): {mse}')
# Plot the data and the decision tree predictions
plt.scatter(X, y, color='blue', label='Original data')
plt.plot(X_test, y_pred, color='red', linewidth=3, label='Decision Tree')
plt.xlabel('Index')
plt.ylabel('Value')
plt.legend()
plt.show()
return y_pred
def prophet_predictor(self):
# Prepare the data for Prophet
df = pd.DataFrame({'ds': range(len(self.numericData)), 'y': self.numericData})
# Create and fit the Prophet model
model = Prophet()
model.fit(df)
# Make future dataframe for predictions
future = model.make_future_dataframe(periods=len(self.predictions))
# Predict with Prophet
forecast = model.predict(future)
# Extract predictions from the forecast
prophet_predictions = forecast['yhat'].tail(len(self.predictions))
return prophet_predictions
def transform_numeric_to_category(self, numeric_values, default_category=None):
predictions = []
start = len(self.numericData)-len(numeric_values)
end = len(self.numericData)
for i in range(start,end):
val = self.chainList[i - 1]
next_state_index = self.state_possibilities[val].index(max(self.state_possibilities[val]))
selected = self.states[next_state_index]
remaining_states = [state for state in self.states if state != selected]
remaining_probs = [self.state_possibilities[val][self.states.index(state)] for state in remaining_states]
max_prob = self.state_possibilities[self.states[next_state_index]][next_state_index]
sum_other_probs = sum(remaining_probs)
selected_other = None
if sum_other_probs > max_prob:
selected_other = self.recursive_state_selection(remaining_states, remaining_probs,max_prob,sum_other_probs)
if selected_other is not None:
diff_new = (numeric_values[i-start] - self.avg_data[i][self.states.index(selected_other)]) * self.state_possibilities[selected_other][next_state_index]
if i == 1:
pass
elif diff_new >= min(self.numericData[0:i]):
selected = selected_other
else:
pass
predictions.append(selected)
return predictions
def plot_data(self,data):
plt.plot(data)
plt.xlabel('Index')
plt.ylabel('Value')
plt.title('Numeric List Plot')
plt.show()
Tests
# Example usage
import random
import string
def random_chars(n):
words = []
for i in range(n):
chars = ''
for j in range(i): # Iterate up to i
if j >= i:
break
chars += random.choice(string.ascii_lowercase)
words.append(chars)
return words
# Example usage:
result = random_chars(10)
print(result)
import string
states = random_chars(20)
markov_chain_creator = MarkovChainCreator(states)
markov_chain_creator.assign_possibilities()
markov_chain_creator.show_possibilities()
chain_list = markov_chain_creator.create_chain(300)
print(markov_chain_creator.countDictScores())
markov_chain_creator.transform_numeric()
markov_chain_creator.plot_data(markov_chain_creator.numericData)
markov_chain_creator.predictor()
markov_chain_creator.markovPredictor()
markov_chain_creator.transform_predictions()
markov_chain_creator.plot_data(markov_chain_creator.predictionNumericData)
decision_tree_pred = markov_chain_creator.fit_decision_tree()
ridge_reg_pred = markov_chain_creator.fit_ridge_regression()
dec_tree_preds_category = markov_chain_creator.transform_numeric_to_category(decision_tree_pred)
ridge_reg_pred_category = markov_chain_creator.transform_numeric_to_category(decision_tree_pred)
Results
print(markov_chain_creator.numericData)
print(markov_chain_creator.predictionNumericData)
print(markov_chain_creator.avg_data)
acc = 0
for i,j in zip(markov_chain_creator.predictions,chain_list):
if i == j:
acc += 1/len(chain_list)
print(acc)
acc = 0
for i,j in zip(markov_chain_creator.markovPredictions,chain_list):
if i == j:
acc += 1/len(chain_list)
print(acc)
acc = 0
for i,j in zip(dec_tree_preds_category,chain_list):
if i == j:
acc += 1/len(chain_list)
print(acc)
acc = 0
for i,j in zip(ridge_reg_pred_category,chain_list):
if i == j:
acc += 1/len(chain_list)
print(acc)
I try to use random forest and ridge regression. I could use any model for prediction but I tried to transform these values then predicting max probability from one state beats other models because it is kind of predicting and accident from a specific data when level is categorical.
I used zip longest for better matching still prediction level is pretty low.
Therefore I will use other ideas and work on about this subject. Right now I made this progress in one day only. It will be evolved. However, like an product it is not wise to share unfinished rare work. Because it is the way. While I stated this, I publish this work. If you interested about this please follow me or similarly.
Thanks
Sources:
- https://datascience.stackexchange.com/questions/54138/how-can-time-series-analysis-be-done-with-categorical-variables
- https://towardsdatascience.com/predicting-the-weather-with-markov-chains-a34735f0c4df
- https://web.itu.edu.tr/topcuil/ya/END332E_MC.pdf
- https://ericmjl.github.io/essays-on-data-science/machine-learning/markov-models/