Algorithmic Stock Trading with XGBoost and Kalman Filters – Strategy

Sam Bennett
6 min readMar 6, 2023

--

Algorithm paper trading on OANDA

Ever since I learnt about the biases in human thinking in my first statistics class I have been aware that decisions people make are rarely logical and most of the time veer into illogical. In order to mitigate against this in my own life, I have found created statistical models very useful. In this article, I would like to share my exploration of following a passion of mine and writing a stock trading algorithm to remove the bias in my own stock trading. My partner for this project was Tom Miller, please read his article about the use of indicators and more from our stock trading project.

Model Choice

In my previous project of building a football betting algorithm, I found that XGBoost was highly precise and accurate machine learning model thus seemed natural that this would be the backbone of my stock prediction algorithm. I also attempted to use other algorithms such as neural networks, SVM and EMD however these didn’t seem to be as profitable using my simulations. Wanting to reduce noise in my training data, I started by fitting linear models to data to find coefficients that would start to represent the proportion of the current price that is determined by previous time steps. In this modelling, it seemed that 99% of the current price can be determined by the previous two time points. I labelled this process ‘Markov’ due to its similarities to Markov chains however using the previous two points to describe the current price point rather than only the previous one.

lr = np.linalg.lstsq(data_close[[‘lag_1',’lag_2']],data_close[‘close’], rcond=None)[0]

print("lr")

print(lr)


data_close[‘Markov’] = np.dot(data_close[[‘lag_1',’lag_2']],lr)


fig, ax = plt.subplots()

# Plot the first dataset

ax.plot(data_close['close'] , label='actual')

ax.plot(data_close['Markov'],label='Markov')

ax.legend(loc="upper left")

plt.show()

I planned that I could create a machine learning algorithm by shifting the time points back one for the training data and the test data being unshifted (ideally this would be able to predict the price movement given the model is accurate enough).

#Define target and features

X_train = train_data[['lag_1','lag_2']]

y_train = train_datax['close']

X_test = test_data[['lag_1','lag_2']]

y_test = test_datax['close']

#Standard Scaler

X_train = X_train.apply(lambda x: (x - x.mean()) / (x.std()))

X_test = X_test.apply(lambda x: (x - x.mean()) / (x.std()))

Smoothing

As this is attempting to predict the future, large amounts of noise will be involved and accuracy will be low. I believed that if the general trend of the stock movement can be modelled then smoothing as seen below using Kalman Filters and Savgol filters can decrease some of the louder noise involved.

import xgboost as xgb

# Define classifier

classifier = xgb.XGBRegressor(random_state=30)



# Train classifier

classifier.fit(X_train, y_train)



# Test classifier

y_pred = classifier.predict(X_test)

y_pred = y_pred.flatten()



combined = pd.DataFrame(dict(actual=y_test, XGBoost=y_pred))



from scipy.signal import savgol_filter



data['smooth'] = savgol_filter(data['XGBoost'],window_length=3, polyorder=2)



#data['smooth'] = savgol_filter(data['smooth'],window_length=5, polyorder=3)

import numpy as np

import numpy as np

from pykalman import KalmanFilter



# Define the observation matrix, which is taken as an identity matrix in this example

observation_matrix = np.identity(1)



# Estimate the initial state mean and initial state covariance based on historical data

initial_state_mean = np.mean(data['smooth'])

initial_state_covariance = np.cov(data['smooth'])



# Define the transition matrix, which assumes a linear relationship between the state at time t and t-1

transition_matrix = np.array([[1]])



# Define the process noise covariance and observation noise covariance, which are assumed to be diagonal matrices with small values in this example

process_noise_covariance = np.array([[1e-5]])

observation_noise_covariance = np.array([[1e-3]])



# Create a KalmanFilter object

kf = KalmanFilter(

transition_matrices=transition_matrix,

observation_matrices=observation_matrix,

initial_state_mean=initial_state_mean,

initial_state_covariance=initial_state_covariance,

#process_noise_covariance=process_noise_covariance

)



#Fit the Kalman filter to the financial data

filtered_state_means, filtered_state_covariances = kf.filter(data['smooth'])

data['Kalman'] = pd.DataFrame(filtered_state_means, index=data['smooth'].index)



# Create a figure and an axis

fig, ax = plt.subplots()

# Plot the first dataset

ax.plot(data['close'] , label='actual')



ax2 = ax.twinx()



#ax2.plot(data['ma3XGBoost'],'k',label='ma3XGBoost')

ax2.plot(data['smooth'],'c',label='smooth')



ax3 = ax.twinx()

ax3.plot(data['Kalman'],'m', label='Kalman filter')

#ax2.plot(data['fourier'],'m',label='fourier')



plt.title('Stock Movement')

ax.legend()

ax2.legend(loc='upper left')

ax3.legend(loc='lower left')

plt.show()

Splitting

When the code runs it uses starts by collecting data from the time period specified (more on this in an article I will write on collecting & cleaning stock data). Then splits the most recent day of price data for test data (10%) whilst the other 90% is for training the machine learning model. We then run the algorithm specified above.

Trading Indicators & Simulation

My trading indicators were based on the idea that the general trend of my XGBoost generated graph was useful so a derivative and second derivative trading indicator model was used. At certain values (based on trial and error to achieve maximum profit) of the derivative, we would have our buy long and short signal as well as a complimentary sell signal. Using the test data I wrote a simulation to work out the profit that would be achieved, the trading costs I incorporated were those of a micro ES contract just to allow me to standardise my simulations.

#Backtesting



#data[‘Gradient’] = data[‘XGBoost’] - data[‘XGBoost’].shift(1)

data[‘Gradient’] = data[‘Kalman’] - data[‘Kalman’].shift(1)

data[‘2Gradient’] = data[‘Kalman’] - data[‘Kalman’].shift(2)

data[‘SecondDeriv’] = data[‘Gradient’] - data[‘Gradient’].shift(1)



data[‘hour’] = data.index.hour

data[‘Trading’] = np.where(np.logical_and(data[‘hour’] >= 10,data[‘hour’] < 16),1,0)

#buy when gradient > 0.2, sell if gradient < 0 and buy = True



#Long Trades

#data[‘trades_L’] = np.where(np.logical_and(0.13<data[‘Gradient’],data[‘Gradient’]<0.18),1,0)



"""Current best strategies:



#1 - ES

data[‘trades_Buy_L’] = np.where(np.logical_and(data[‘SecondDeriv’]<-0.03,data[‘Gradient’]>0.04),1,0)

data[‘trades_Sell_L’] = np.where(np.logical_and(data[‘SecondDeriv’]>-0.01,data[‘Gradient’]<0.03),-1,0)



"""



data[‘trades_Buy_L’] = np.where(data[‘Trading’]==1, np.where(np.logical_and(data[‘SecondDeriv’]<-0.03, data[‘Gradient’]>0.02),1,0), 0)

data[‘trades_Sell_L’] = np.where(data[‘Trading’]==1, np.where(np.logical_and(data[‘SecondDeriv’]>-0.01, data[‘Gradient’]<0),-1,0), 0)



#Short Trades



data[‘trades_Buy_S’] = np.where(data[‘Trading’]==1, np.where(np.logical_and(data[‘SecondDeriv’]>0.03, data[‘Gradient’]<-0.04),1,0), 0)

data[‘trades_Sell_S’] = np.where(data[‘Trading’]==1, np.where(np.logical_and(data[‘SecondDeriv’]<0.01, data[‘Gradient’]>-0.03),-1,0), 0)





#Bet where we have RF is greater than previous point



data[‘Holding_L’] = np.where(data[‘trades_Buy_L’] == 1, 1, np.where(data[‘trades_Sell_L’] == -1, 0, np.nan))

data[‘Holding_L’].fillna(method=’ffill’, inplace=True)

data[‘prev_holding_L’] = data[‘Holding_L’].shift(1)



data[‘Holding_S’] = np.where(data[‘trades_Buy_S’] == 1, 1, np.where(data[‘trades_Sell_S’] == -1, 0, np.nan))

data[‘Holding_S’].fillna(method=’ffill’, inplace=True)

data[‘prev_holding_S’] = data[‘Holding_S’].shift(1)





#Calculating where trades are made

data[‘change_L’] = np.where((data[‘Holding_L’] == 1) & (data[‘prev_holding_L’] == 0), 1, np.where((data[‘Holding_L’] == 0) & (data[‘prev_holding_L’] == 1), -1, 0))

data[‘change_S’] = np.where((data[‘Holding_S’] == 1) & (data[‘prev_holding_S’] == 0), 1, np.where((data[‘Holding_S’] == 0) & (data[‘prev_holding_S’] == 1), -1, 0))



# Generate trades, we trade if over a sufficient number



hold_mask_L = data[‘change_L’] == 1

hold_mask_S = data[‘change_S’] == 1



# Create a boolean array for when the holding is 0

not_hold_mask_L = data[‘change_L’] == -1

not_hold_mask_S = data[‘change_S’] == -1



""" Plotting """

# Plot the Close values in green when holding is 1

plt.plot(data[hold_mask_L].index, data[hold_mask_L][‘close’], ‘g.’, label=’Bought_L’,markersize=10)



plt.plot(data[hold_mask_S].index, data[hold_mask_S][‘close’], ‘k.’, label=’Bought_S’,markersize=10)



# Plot the Close values in red when holding is 0

plt.plot(data[not_hold_mask_L].index, data[not_hold_mask_L][‘close’], ‘r.’, label=’Sold_L’,markersize=10)



plt.plot(data[not_hold_mask_S].index, data[not_hold_mask_S][‘close’], ‘m.’, label=’Sold_S’,markersize=10)



plt.plot(data[‘close’], label= ‘Close’, dashes=[3, 1])



# Add a legend to the plot

plt.legend()







#Calculation of profit

data[‘profit_L’] = data[‘Holding_L’] * (data[‘close’] - data[‘close’].shift(1))

profit_L = data[‘profit_L’].sum()



data[‘profit_S’] = data[‘Holding_S’] * (data[‘close’].shift(1)-data[‘close’])

profit_S = data[‘profit_S’].sum()

data[‘cumprofit’] = data[‘profit_S’].cumsum() + data[‘profit_L’].cumsum()



Long_profit = data[‘profit_L’].sum()*50

Short_profit = data[‘profit_S’].sum()*50



print("Long Profit multiplier",data[‘profit_L’].sum())

print("Short Profit multiplier",data[‘profit_S’].sum())

print("Long Profit",Long_profit)

print("Short Profit",Short_profit)





#data.to_csv("UnFunctioned_V1.1.csv")

Number_of_trades_L = (data[‘trades_Buy_L’].sum())

Number_of_trades_S = (data[‘trades_Buy_S’].sum())

print("Number of trades (Long)",Number_of_trades_L)

print("Number of trades (Short)",Number_of_trades_S)



Trading_costs_micro = (Number_of_trades_L+Number_of_trades_S)*0.25



Total_profit = (Long_profit + Short_profit) - Trading_costs_micro

print("Profit",Total_profit)

The image below shows the performance of the profit gained from the simulation in the days specified. V1 is the algorithm specified above still with the Savgol filter but without the Kalman Filter. The values on the left are trading with Microsoft stock and the values on the right are trading with ES futures. TP represent the total profit.

Simulated profits when running the algorithm based on my simulation

External Simulation

After this, I have, connected my code to OANDA using the free API for paper trading and am collecting results over a longer period to establish the effectiveness of the trading algorithm. A snippet of the code can be seen below, using external simulation software allows me to truly validate the algorithm not only on a simulation I created.


#Sell

if signal == 1:

#mo is market order

mo = MarketOrderRequest(instrument="SPX500_USD", units=-1, takeProfitOnFill=TakeProfitDetails(price=TPSell).data, stopLossOnFill=StopLossDetails(price=SLSell).data)

r = orders.OrderCreate(accountID, data=mo.data)

rv = client.request(r)

print(rv) #just to see that order has passed

#Buy

elif signal == 2:

mo = MarketOrderRequest(instrument="SPX500_USD", units=1, takeProfitOnFill=TakeProfitDetails(price=TPBuy).data, stopLossOnFill=StopLossDetails(price=SLBuy).data)

r = orders.OrderCreate(accountID, data=mo.data)

rv = client.request(r)

print(rv)



#trading_job()



scheduler = BlockingScheduler()

scheduler.add_job(trading_job, 'cron', day_of_week='mon-fri', hour='10-17',start_date='2022-02-13 10:00:00', timezone='Europe/London')#minute='1,16,31,46'

scheduler.start()

Once, it seems apparent that the coding algorithm is viable and not susceptible to large losses (due to gambler’s ruin etc) it will be time to connect the algorithm to a broker and trade with real funds, starting with ES futures and then expanding.

--

--

Sam Bennett

3rd year statistics student. Sharing my interests & personal projects