Markov Chains — Detailed Exploration

Tech Future
6 min readJan 5, 2024

Markov chains are nice example and root of many possibility related technologies. They are not evolved and best system yet its core idea is very simple and we use it everyday in our lives or see it without conciusness.

It is actually a probabilistic circulation between categorical values. We can determine a space and set probabilities such as eating habits or sunny, warm days or anything else

It could be very complex or simple. We can use it on actions such as observable states with reinforcement learning or stock prices. Mostly games could be modeled with markov chains or next word prediction similarly such cases.

So I prepared a categorical prediction transformation for Markov Chains

import random
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import mean_squared_error
from sklearn.linear_model import Ridge
from sklearn.model_selection import train_test_split
from sklearn.tree import DecisionTreeRegressor
import pandas as pd

class MarkovChainCreator:
def __init__(self, states):
self.states = states
self.state_number = len(states)
self.state_possibilities = {}
x = random.randint(250, 300)
self.scores = [x-50*_ for _ in range(len(states))]

def assign_train(self):
self.train_size = int(len(self.numericData) * 0.8)
self.train, self.test = self.numericData[:self.train_size], self.numericData[self.train_size:]

def assign_possibilities(self):
probabilities = {}
min_probability = 1 / len(self.states) # Set a small non-zero minimum probability

for i in range(self.state_number):
current_possibilities = []

# Ensure that the first probability is positive and not too high
first_probability = random.uniform(min_probability, 1 - min_probability * (self.state_number - 2))
current_possibilities.append(first_probability)

for j in range(1, self.state_number):
remaining_probability = 1 - sum(current_possibilities)

if j == self.state_number - 1:
# Ensure the last probability is positive and completes the sum to 1
current_possibilities.append(max(0, 1 - sum(current_possibilities)))
else:
# Ensure the other probabilities are positive and do not exceed the remaining space
max_remaining_prob = min(remaining_probability, 1 - min_probability)
current_possibilities.append(random.uniform(min_probability, max_remaining_prob))

probabilities[self.states[i]] = [p/sum(current_possibilities) for p in current_possibilities]

self.state_possibilities = probabilities




def show_possibilities(self):
return self.state_possibilities

def create_chain(self, n):
chainList = []
state_next = random.choice(self.states)
chainList.append(state_next)
for _ in range(n):
choice = random.choices(population=self.states, weights=self.state_possibilities[state_next])
state_next = choice[0]
chainList.append(state_next)

self.chainList = chainList
return self.chainList

def countDictScores(self):
self.countsScores = {}
for state in self.states:
self.countsScores[state] = {'counts:':self.chainList.count(state),'scores:':self.scores[states.index(state)]}

return self.countsScores

def transform_numeric(self):
numericData = []

for i in range(len(self.chainList)):
score = 0
for j in range(1, i + 1):
score += self.scores[self.states.index(self.chainList[i])]
numericData.append(score)

self.numericData = numericData
self.assign_train()


def transform_predictions(self):
predData = []

for i in range(len(self.predictions)):
score = 0
for j in range(1, i + 1):
score += self.scores[self.states.index(self.predictions[i])]
predData.append(score)

self.predictionNumericData = predData

def get_average_predictor(self):
self.avg_data = []
for i in range(len(self.numericData)):
scores = [0 for i in range(len(self.scores))]
for j in range(1, i + 1):
for j in range(len(self.scores)):
if j != len(self.scores)-1:
a = self.scores[j]
b = self.scores[j+1]
scores[j] += (a+b)/2

self.avg_data.append(scores)

def recursive_state_selection(self, remaining_states, remaining_probs,max_prob_val,sum_other_probs):
if not remaining_states:
return None

max_prob_index = remaining_probs.index(max(remaining_probs))
selected = remaining_states[max_prob_index]
max_prob = remaining_probs[max_prob_index]

max_prob = sum_other_probs*max_prob
max_prob_val = max_prob_val*max_prob_val

if max_prob >= max_prob_val:
return selected
else:
remaining_states.pop(max_prob_index)
remaining_probs.pop(max_prob_index)
return self.recursive_state_selection(remaining_states, remaining_probs,max_prob_val,max_prob)

def predictor(self):
self.predictions = []
self.get_average_predictor()

for i in range(len(self.numericData)):
val = self.chainList[i - 1]
next_state_index = self.state_possibilities[val].index(max(self.state_possibilities[val]))
selected = self.states[next_state_index]

remaining_states = [state for state in self.states if state != selected]
remaining_probs = [self.state_possibilities[val][self.states.index(state)] for state in remaining_states]
max_prob = self.state_possibilities[self.states[next_state_index]][next_state_index]
sum_other_probs = sum(remaining_probs)

selected_other = None
if sum_other_probs > max_prob:
selected_other = self.recursive_state_selection(remaining_states, remaining_probs,max_prob,sum_other_probs)


if selected_other is not None:
diff_new = (self.numericData[i - 1] - self.avg_data[i - 1][self.states.index(selected_other)]) * self.state_possibilities[selected_other][next_state_index]
if i == 1:
pass
elif diff_new >= min(self.numericData[0:i - 1]):
selected = selected_other
else:
pass

self.predictions.append(selected)

self.transform_predictions()

def markovPredictor(self):
self.markovPredictions = []
for i in range(len(self.chainList)):
val = self.chainList[i-1]
next_state_index = self.state_possibilities[val].index(max(self.state_possibilities[val]))
next_state = self.states[next_state_index]

self.markovPredictions.append(next_state)

def fit_ridge_regression(self):
# Transform the data into the required format
X = np.arange(len(self.chainList)).reshape(-1, 1)
y = np.array(self.numericData)

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Ridge Regression model
alpha = 0.01 # Regularization strength (adjust as needed)
ridge_reg = Ridge(alpha=alpha)
ridge_reg.fit(X_train, y_train)

# Predictions on the test set
y_pred = ridge_reg.predict(X_test)

# Evaluate the model
mse = mean_squared_error(y_test, y_pred)
print(f'Mean Squared Error (Ridge Regression): {mse}')

# Plot the data and the regression line
plt.scatter(X, y, color='blue', label='Original data')
plt.plot(X_test, y_pred, color='red', linewidth=3, label='Ridge Regression')
plt.xlabel('Index')
plt.ylabel('Value')
plt.legend()
plt.show()

return y_pred

def fit_decision_tree(self):
# Transform the data into the required format
X = np.arange(len(self.chainList)).reshape(-1, 1)
y = np.array(self.numericData)

# Split the data into training and testing sets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Decision Tree model
max_depth = 5 # Adjust the depth as needed
decision_tree = DecisionTreeRegressor(max_depth=max_depth)
decision_tree.fit(X_train, y_train)

# Predictions on the test set
y_pred = decision_tree.predict(X_test)

# Evaluate the model
mse = mean_squared_error(y_test, y_pred)
print(f'Mean Squared Error (Decision Tree): {mse}')

# Plot the data and the decision tree predictions
plt.scatter(X, y, color='blue', label='Original data')
plt.plot(X_test, y_pred, color='red', linewidth=3, label='Decision Tree')
plt.xlabel('Index')
plt.ylabel('Value')
plt.legend()
plt.show()

return y_pred

def prophet_predictor(self):
# Prepare the data for Prophet
df = pd.DataFrame({'ds': range(len(self.numericData)), 'y': self.numericData})

# Create and fit the Prophet model
model = Prophet()
model.fit(df)

# Make future dataframe for predictions
future = model.make_future_dataframe(periods=len(self.predictions))

# Predict with Prophet
forecast = model.predict(future)

# Extract predictions from the forecast
prophet_predictions = forecast['yhat'].tail(len(self.predictions))

return prophet_predictions


def transform_numeric_to_category(self, numeric_values, default_category=None):
predictions = []
start = len(self.numericData)-len(numeric_values)
end = len(self.numericData)
for i in range(start,end):
val = self.chainList[i - 1]
next_state_index = self.state_possibilities[val].index(max(self.state_possibilities[val]))
selected = self.states[next_state_index]

remaining_states = [state for state in self.states if state != selected]
remaining_probs = [self.state_possibilities[val][self.states.index(state)] for state in remaining_states]
max_prob = self.state_possibilities[self.states[next_state_index]][next_state_index]
sum_other_probs = sum(remaining_probs)

selected_other = None
if sum_other_probs > max_prob:
selected_other = self.recursive_state_selection(remaining_states, remaining_probs,max_prob,sum_other_probs)


if selected_other is not None:
diff_new = (numeric_values[i-start] - self.avg_data[i][self.states.index(selected_other)]) * self.state_possibilities[selected_other][next_state_index]
if i == 1:
pass
elif diff_new >= min(self.numericData[0:i]):
selected = selected_other
else:
pass

predictions.append(selected)

return predictions


def plot_data(self,data):
plt.plot(data)
plt.xlabel('Index')
plt.ylabel('Value')
plt.title('Numeric List Plot')
plt.show()

Tests

# Example usage

import random
import string

def random_chars(n):
words = []
for i in range(n):
chars = ''
for j in range(i): # Iterate up to i
if j >= i:
break
chars += random.choice(string.ascii_lowercase)
words.append(chars)
return words

# Example usage:
result = random_chars(10)
print(result)


import string
states = random_chars(20)
markov_chain_creator = MarkovChainCreator(states)
markov_chain_creator.assign_possibilities()
markov_chain_creator.show_possibilities()
chain_list = markov_chain_creator.create_chain(300)
print(markov_chain_creator.countDictScores())
markov_chain_creator.transform_numeric()
markov_chain_creator.plot_data(markov_chain_creator.numericData)
markov_chain_creator.predictor()
markov_chain_creator.markovPredictor()
markov_chain_creator.transform_predictions()
markov_chain_creator.plot_data(markov_chain_creator.predictionNumericData)
decision_tree_pred = markov_chain_creator.fit_decision_tree()
ridge_reg_pred = markov_chain_creator.fit_ridge_regression()
dec_tree_preds_category = markov_chain_creator.transform_numeric_to_category(decision_tree_pred)
ridge_reg_pred_category = markov_chain_creator.transform_numeric_to_category(decision_tree_pred)

Results

Markov Chain Mapped into Numeric Values
Prediction result
Decision Tree Result
Ridge Regression result
print(markov_chain_creator.numericData)
print(markov_chain_creator.predictionNumericData)
print(markov_chain_creator.avg_data)
acc = 0
for i,j in zip(markov_chain_creator.predictions,chain_list):
if i == j:
acc += 1/len(chain_list)

print(acc)
acc = 0
for i,j in zip(markov_chain_creator.markovPredictions,chain_list):
if i == j:
acc += 1/len(chain_list)

print(acc)
acc = 0
for i,j in zip(dec_tree_preds_category,chain_list):
if i == j:
acc += 1/len(chain_list)

print(acc)
acc = 0
for i,j in zip(ridge_reg_pred_category,chain_list):
if i == j:
acc += 1/len(chain_list)

print(acc)
Different Predictions

I try to use random forest and ridge regression. I could use any model for prediction but I tried to transform these values then predicting max probability from one state beats other models because it is kind of predicting and accident from a specific data when level is categorical.

zip longest

I used zip longest for better matching still prediction level is pretty low.

Therefore I will use other ideas and work on about this subject. Right now I made this progress in one day only. It will be evolved. However, like an product it is not wise to share unfinished rare work. Because it is the way. While I stated this, I publish this work. If you interested about this please follow me or similarly.

Thanks

Sources:

--

--