Temporal Convolutional Neural Network with Conditioning for Broad Market Signals

Adam
Call For Atlas
Published in
32 min readFeb 9, 2024
DALL·E 2 2024 — Interpretation of a TCN

There is a naive strategy amongst the alleged day-traders, that if you buy a levered ETF at open, and sell at close, and do it multiple time you’d be wealthy in no time. This is an interesting statement that ought to be challenged with deep learning networks. In this article, we will build one called a Temporal Convolution Network (TCN) for binary classification, which will learn and predict the market direction day-by-day if its bullish (and profitable) or bearish.

Convolutional neural networks (CNNs) are commonly used for image classification, as the spectral graph convolutions can extract latent features and patterns. The same is applicable to timeseries' as proven by the seminal paper from Oord et al. describing Google Deepmind’s WaveNet architecture (paper and code in the reference section). We will borrow concepts from WaveNet to teach our model to identify deep structures in the market, and try to emulate a profitable day trader.

NB: This is a long article, to skip to the network’s architecture, see section: Temporal Convolution Neural Network (TCN). The code in this article is not executable, get the notebook from my github.

Financial Data

Using yfinance, we will pull 20 years of market data, specifically:

  • SPY ETF — which will be our target label.
  • CBoE’s VIX as a proxy for market sentiment and volatility.
  • The Russell 2000 small caps index, as a proxy for speculation.
  • The Gold Futures Index as proxy for investor uncertainty.
  • The 10 year US Treasury Note as proxy for inflation and rates.
from datetime import datetime
from scipy.stats import skew, kurtosis
from pandas.tseries.offsets import BDay

START_DATE = "1999-01-01"
END_DATE = pd.Timestamp(datetime.now() - BDay(1)).strftime('%Y-%m-%d')
DATA_DIR = "data"
INDEX = "Date"
TARGET_ETF = "SPY" # S&P 500
RATES_INDEX = "^TNX" # 10 Year Treasury Note Yield
VOLATILITY_INDEX = "^VIX" # CBOE Volatility Index
SMALLCAP_INDEX = "^RUT" # Russell 2000 Index
GOLD_INDEX = "GC=F" # Gold futures
tickers_symbols = [
TARGET_ETF,
VOLATILITY_INDEX,
RATES_INDEX,
SMALLCAP_INDEX,
GOLD_INDEX,
]
INTERVAL = "1d"

def get_tickerdata(tickers_symbols, start=START_DATE, end=END_DATE, interval=INTERVAL, datadir=DATA_DIR):
tickers = {}
earliest_end= datetime.strptime(end,'%Y-%m-%d')
latest_start = datetime.strptime(start,'%Y-%m-%d')
os.makedirs(DATA_DIR, exist_ok=True)
for symbol in tickers_symbols:
cached_file_path = f"{datadir}/{symbol}-{start}-{end}-{interval}.csv"

try:
if os.path.exists(cached_file_path):
df = pd.read_csv(cached_file_path, index_col=INDEX)
df.index = pd.to_datetime(df.index)
assert len(df) > 0
else:
df = yf.download(
symbol,
start=START_DATE,
end=END_DATE,
progress=False,
interval=INTERVAL,
)
assert len(df) > 0
df.to_csv(cached_file_path)
min_date = df.index.min()
max_date = df.index.max()
nan_count = df["Close"].isnull().sum()
skewness = round(skew(df["Close"].dropna()), 2)
kurt = round(kurtosis(df["Close"].dropna()), 2)
outliers_count = (df["Close"] > df["Close"].mean() + (3 * df["Close"].std())).sum()
print(
f"{symbol} => min_date: {min_date}, max_date: {max_date}, kurt:{kurt}, skewness:{skewness}, outliers_count:{outliers_count}, nan_count: {nan_count}"
)
tickers[symbol] = df

if min_date > latest_start:
latest_start = min_date
if max_date < earliest_end:
earliest_end = max_date
except Exception as e:
print(f"Error with {symbol}: {e}")

return tickers, latest_start, earliest_end

print(f"Date ranges {latest_start} - {earliest_end}")
for ticker in tickers_symbols:
df = tickers.get(ticker)
df = df[(df.index >= latest_start) & (df.index <= earliest_end)]

assert len(df) > 0 and not df.isna().any().any()

tickers[ticker] = df

percentage_returns = {}

for ticker in tickers_symbols:
df = tickers.get(ticker)
percentage_returns[ticker] = ((1 + df["Close"].pct_change()).cumprod() - 1) * 100
plt.figure(figsize=(16, 6))
for ticker, pr in percentage_returns.items():
plt.plot(pr, label=ticker, alpha=0.75)

plt.legend(loc="upper left")
plt.grid(True)
plt.tight_layout()
plt.show()

The Finfluencer Baseline Strategy

Buy at open, sell at close — easy right? Or that is what the financial gurus on youtube want to persuade us to believe in, and buy their courses.

Let’s do a quick and dirty test to test their hypothesis:

TARGET_FACTOR = 55/100/100  # assume 50BP spread on average
print(f"target factor used: {TARGET_FACTOR} for window: {WINDOW_SIZE} predicting: {PREDICTION_HORIZON} step")

target_etf = tickers.get(TARGET_ETF)

next_close = (target_etf["Close"].shift(-1) * (1- TARGET_FACTOR))
days_rising = target_etf[next_close > target_etf["Close"]]
days_not_rising = target_etf[next_close <= target_etf["Close"]]
days_rising_count = days_rising.groupby(days_rising.index.year).size()
days_not_rising_count = days_not_rising.groupby(days_not_rising.index.year).size()

total_days = target_etf.groupby(target_etf.index.year).size()
percentage_rising = (days_rising_count / total_days) * 100

yearly_counts = pd.DataFrame(
{"Rising": days_rising_count, "Not Rising": days_not_rising_count}
)
yearly_counts.plot(kind="bar", color=["green", "red"], figsize=(16, 4))
plt.title("Days Rising vs Not Rising per Year")
plt.xlabel("Year")
plt.ylabel("Count")
plt.tick_params(axis="x", rotation=45)
plt.legend()
plt.tight_layout()
plt.show()

print(f"Baseline Accuracy: {percentage_rising.mean():0.2f}%")

Looking at these numbers, we have 29% of making a profit with this strategy, such a low ROI is onlytrue because we are factoring in a 50 basis points spread, and various fees and commissions the broker would charge us. Therefore the hypothesis is rejected, and the alternative is true, you’ll be unprofitable and even poorer if you bought their course — unless you leverage some state-of-the-art neural network (NN)

We know that this NN precision score should outperform the baseline’s 29% to be successful.

Feature Selection and Engineering

Before giving the financial timeseries to the model, we need to do some wrangling and feature engineering. Neural Networks (NN) see numbers differently than us, they are interested in magnitudes, cycles and distances. This means we have to encode our data in a way that it makes a NN learn better, be more efficient, and therefore more accurate.

Throughout the experiments we used log returns (capturing percentage moves):

first order of integration (differencing the values of today with yesterday):

and finally normalizing the raw data:

We chose the first order differencing — normalized for our data’s preprocessing.

For temporal data, we took a different approach. Since time is cyclic (even years, you can think of these as business cycles), it can be encoded into sin/cosine waves, for example, days can be encoded in the following equation:

Though a more accurate representation of time is available through Radial Basis Functions (RBF):

This is more accurate as distances are measured from a focal point, therefore Sunday is larger than Monday although in the days’ sequence they are side-by-side.

In a cyclic function, the distance of Sunday is not well represented as it is closer to Monday rather than further, while the RBF gives the right distance representation:

def create_time_features(data_df):
"""
Encodes time cyclic features for a dataset with monthly sampling.
Including cyclic encoding for day and year.
:param data_df: The timeseries with a date in the format YYYY-MM-DD as index.
:return: data_df with added wave features for month, day, and year.
"""
if not isinstance(data_df.index, pd.DatetimeIndex):
raise ValueError("The DataFrame index must be a DateTimeIndex.")

def _sin_transformer(period):
return FunctionTransformer(lambda x: np.sin(x / period * 2 * np.pi))

def _cos_transformer(period):
return FunctionTransformer(lambda x: np.cos(x / period * 2 * np.pi))

def rbf_transform(x, period, input_range):
x_normalized = (x - input_range[0]) / (input_range[1] - input_range[0]) * period
return np.exp(-0.5 * ((x_normalized - period / 2) / 1.0) ** 2)

data_df[DAY_RBF] = rbf_transform(data_df.index.day, 31, (1, 31))
data_df[MONTH_RBF] = rbf_transform(data_df.index.month, 12, (1, 12))
data_df[Q_RBF] = rbf_transform((data_df.index.month - 1) // 3 + 1, 4, (1, 4))
min_year, max_year = data_df.index.year.min(), data_df.index.year.max()
data_df[BIZ_RBF] = rbf_transform(
data_df.index.year, max_year - min_year, (min_year, max_year)
)

return data_df

def create_features_df(tickers, data_df, target_label=TARGET_ETF):
"""
Create all exogenous features that lead to our target etf.
- if the trading day close is higher than the open.
- price log returns or 1D integrations.
:param tickers: All the timeseries with a date in the format YYYY-MM-DD as index.
:param data_df: Any pre-engineered features.
:return: data_df With TARGET_LABEL timeseries at column 0, and the rest are for conditioning.
"""

def _get_first_difference(data_df):
return data_df.pct_change().fillna(0)

def _get_log_returns(data_df):
return np.log(data_df / data_df.shift(1)).fillna(0)

IDX_COL = "Open"
price_transform = FunctionTransformer(_get_first_difference)

data_df[PRICE_FEATURES] = price_transform.fit_transform(data_df[PRICE_FEATURES])
rates_df = tickers.get(RATES_INDEX)

for index in EXOG_TS:
if index == target_label:
continue
index_df = tickers.get(index)
transformed_data = price_transform.fit_transform(index_df[[IDX_COL]])
data_df[index] = transformed_data

data_df = data_df.fillna(0)

return data_df

IS_CLASSIFICATION = True

def prepare_data(tickers, target_label=TARGET_ETF, classify=IS_CLASSIFICATION, to_normalize=True):
"""
Utility function to prepare the data.
:data_df dataframe: dataframe with `window_size` months of data to predict the `window_size`+`horizon`.
:param window_size: int, length of the input sequence
:param horizon: int, forecasting horizon, defaults to 1
:return: Array in the shape of (n_samples, n_steps, n_features)
"""
y_scaler = None, None
data_df = tickers.get(target_label).copy()
close_tomorrow_df = (data_df['Close'].shift(-1) * (1- TARGET_FACTOR))
if classify:
today_df = data_df['Close'] # data_df['Open'].shift(-1)
# Calculate the target label: 1 if next day's close is higher than its open.
data_df[TARGET_LABEL] = (close_tomorrow_df > today_df).astype(int)
else:
# A target variable with a large spread of values, in turn, may result in large error gradient values
# causing weight values to change dramatically. We predict tomorrows close.
y_scaler = MinMaxScaler(feature_range=(-1, 1))
y_scaled = y_scaler.fit_transform(close_tomorrow_df)
data_df[TARGET_LABEL] = y_scaled.flatten()

data_df = create_features_df(tickers, data_df, target_label=target_label)
data_df_normalized = None
if to_normalize:
data_df_normalized = normalize(data_df[TARGET_TS_FEATURES_NOTIME], norm="l2")
else:
data_df_normalized = data_df[TARGET_TS_FEATURES_NOTIME]
data_df_normalized = pd.DataFrame(
data_df_normalized, columns=TARGET_TS_FEATURES_NOTIME
)
data_df_normalized = pd.concat(
[
data_df[TARGET_LABEL].reset_index(drop=True),
data_df_normalized.reset_index(drop=True),
],
axis=1,
)
if any(feature in TIME_FEATURES for feature in TARGET_TS_FEATURES):
# Don't normalize these.
data_df = create_time_features(data_df)
data_df_normalized = pd.concat(
[
data_df[TIME_FEATURES].reset_index(drop=True),
data_df_normalized.reset_index(drop=True),
],
axis=1,
)
# we might drop the first and last row of data (shifts and difference => NAs)
return data_df_normalized.dropna(axis=0), y_scaler

Data Analyis & Feature Selection

let’s start with a simple check, if we are to predict the number of days the target instrument (SPY in our case) Closes higher than the Open (we know already its 29% of the time) and factor in spreads with fees, is our data set balanced?

No its not — this is can be a problem for the NN as it can get biased on the 0 label (bearish signal — not higher than the start).

In this case we have to use a specific loss function called Focal Loss as an alternative to the standard Cross-Entropy, more on this in the model training section.

Principal Component Analysis (PCA)

We carry out a PCA to assert which features have most info in relation to our target, and attempt to identify anomalies in our timeseries.

Let’s start with a timeseries definition: An interdependent multiplicative timeseries (as most financial timeseries are) is comprised of 4 components:

Where T is the trend, S is the seasonality, C is the cyclical trend (business cycles for us) and e is noise, at any point in time t. We know that all our data is (from De Prado’s countless literature on financial data):

  1. Autoregressive, serially correlated and very multicollinear
  2. Not stationary with a unit-root as its mean and standard deviation moves.
  3. Has trend, seasonality and all manner of noisome structures.

With this in mind, PCA will decompose our timeseries into components of meaningful variance. The cumulative variance shows which features are of importance in our data and which will contribute most information:

from sklearn.decomposition import PCA

MAX_VARIANCE = 0.95

data_df_pca = data_df.drop(columns=[TARGET_LABEL])
pca = PCA()
xdata = pca.fit_transform(data_df_pca)

cum_var_exp = np.cumsum(pca.explained_variance_ratio_)
num_components = np.argmax(cum_var_exp >= MAX_VARIANCE) + 1
print(f"Max components for {MAX_VARIANCE*100}% variance: {num_components} out of {data_df.shape[1]}")

eigenvectors = pca.components_
loadings_df = pd.DataFrame(eigenvectors, columns=data_df_pca.columns).T
summed_loadings = np.sum(np.abs(eigenvectors), axis=0)
summed_loadings_df = pd.DataFrame(summed_loadings, index=data_df_pca.columns, columns=["Sum"]).sort_values(by="Sum", ascending=False)

fig, axes = plt.subplots(nrows=3, ncols=1, figsize=(18, 10), gridspec_kw={'height_ratios': [1, 2, 4]})

summed_loadings_df.plot(kind="bar", legend=False, ax=axes[0])
axes[0].set_title("Summed Loadings Across All Principal Components")
axes[0].set_ylabel("Summed Loadings")
axes[0].set_xlabel("Features")
axes[0].tick_params(axis='x', labelrotation=45)


loadings_df.plot(kind="bar", legend=False, ax=axes[1])
axes[1].set_title(f"Loadings for Principal Components")
axes[1].set_ylabel("Loadings")
axes[1].set_xlabel("Features")
axes[1].tick_params(axis='x', labelrotation=45)
axes[1].legend(loc='upper left', bbox_to_anchor=(1, 1))

pca_reduced = PCA(n_components=num_components)
xdata_reduced = pca_reduced.fit_transform(data_df_pca)
xdata_projected = pca_reduced.inverse_transform(xdata_reduced)
error = np.sum((data_df_pca - xdata_projected) ** 2, axis=1)
anomaly_threshold = np.percentile(error, MAX_VARIANCE*100)
anomalies = error > anomaly_threshold

data_df_pca = data_df.copy()
anomaly_dates = data_df.index[np.where(anomalies)]
for i, column in enumerate(data_df_pca.columns):
if column == TARGET_LABEL or column in TIME_FEATURES or column == "Volume":
continue
axes[2].scatter(anomaly_dates, data_df_pca.loc[anomaly_dates, column], label='Anomaly' if i == 0 else "__nolegend__", color='k')
axes[2].plot(data_df_pca.index, data_df_pca[column], label=f'{column}', alpha=0.8)
axes[2].set_title('Time Series with Anomalies Highlighted')
axes[2].legend(loc='upper left', bbox_to_anchor=(1, 1))
plt.tight_layout()
plt.show()

By using PCA we can deconstruct and reconstruct with reduced components to indentify variance, and therefore anomalies. These are plotted in the chart above, with the black circles signalling outliers above the 95th percentile.

The anomaly chart is interesting, the market movements during the 2008 global recession were relatively uniform across all sectors (broad market downturn), returns moved together in a way that PCA considers normal. In contrast, the periods of 2000–2005 (DotCom bubble) and post-2019 (Covid) have exhibited more varied behaviors across these indices, leading to higher reconstruction errors.

We also use the Information Coefficient (IC), calculated here as the Spearman’s rank correlation to understand the features’ importance. It measures the strength and direction of a monotonic relationship between two variables. IC is useful because it can capture nonlinear relationships between variables. A high absolute value of the IC indicates a strong relationship, which could be either positive or negative. Spearman’s rank is represented by the following formula:

  • Where the sum of d represents the sum of the squared differences in ranks between each pair of observations.
  • n is the number of observations.
  • The constant 6 is part of the normalization factor that scales the sum of squared rank differences.

The Mutual Information (MI) also quantifies the amount of information obtained about one random variable through observing the other random variable. It’s measuring how much information each feature in our dataset provides about the target variable. Unlike correlation, MI can capture any kind of relationship between variables, not just linear or monotonic. MI between X and Y can be represented by the formula:

  • p(x,y) is the joint probability distribution function of X and Y, indicating the probability that X takes on value x and Y takes on value y simultaneously.
  • p(x) and p(y) are the marginal probability distribution functions of X and Y, respectively, indicating the probabilities of X taking on value x and Y taking on value y independently.

Many good things come from feature selection: less chance of over-fitting, faster training times, more accurate models — though sometimes at the cost of interpretability.

In our case all features are deemed important within an economic context, though we ought to finetune these in the next section. The graph below represents the IC and MI of our current features:

import seaborn as sns
import matplotlib.pyplot as plt
from scipy.stats import spearmanr
from sklearn.feature_selection import mutual_info_regression

ic = {}
for column in data_df.columns:
if column != TARGET_LABEL:
corr, p_val = spearmanr(data_df[TARGET_LABEL], data_df[column])
ic[column] = [corr, p_val]

ic_df = pd.DataFrame(ic, index=["IC", "p-value"]).T

mi = mutual_info_regression(X=data_df.drop(columns=[TARGET_LABEL]), y=data_df[TARGET_LABEL])
mi_series = pd.Series(mi, index=data_df.drop(columns=[TARGET_LABEL]).columns)
metrics = pd.concat(
[
mi_series.to_frame("Mutual Information"),
ic_df["IC"].to_frame("Information Coefficient"),
],
axis=1,
)
metrics = metrics.sort_values(by="Mutual Information", ascending=False)
ax = metrics.plot.bar(figsize=(18, 4), rot=45)
ax.set_xlabel("Features")
ax.set_ylabel("Scores")
ax.set_title("Feature Evaluation: MI & IC")
sns.despine()
plt.tight_layout()
plt.show()

Feature Selection

Let’s do a union of the important features from both PCA and IC/MI methods, with the assumption that only these are what the model needs.

There are other checks we can do, such as pearson correlation and cointegration, plus checking their T-scores. Sadly in earlier experiments, this feature set was too simplified for the model, causing it to underfit with high bias. We will train it with the full feature set:

MAX_FEATURES_COUNT = 10

features_pca = summed_loadings_df.head(MAX_FEATURES_COUNT).index.tolist()
features_miic = (metrics.head(MAX_FEATURES_COUNT).index.tolist())
print(F"Top {MAX_FEATURES_COUNT} PCA Loadings: {features_pca}")
print(F"Top {MAX_FEATURES_COUNT} MI/IC: {features_miic}")
SELECTED_FEATURES = list(set(features_pca) & set(features_miic))

print(f"Selected {len(SELECTED_FEATURES)} features: {SELECTED_FEATURES}")
Top 10 PCA Loadings: ['Open', 'Low', 'Close', '^RUT', 'High', '^TNX', 'GC=F', '^VIX', 'month_rbf', 'quart_rbf']
Top 10 MI/IC: ['day_rbf', 'month_rbf', 'Open', '^TNX', 'Low', '^VIX', 'GC=F', '^RUT', 'quart_rbf', 'biz_rbf']
Selected 8 features: ['quart_rbf', '^TNX', 'Open', '^RUT', 'GC=F', 'month_rbf', '^VIX', 'Low']
SELECTED_EXOG =  EXOG_TS # [feature for feature in SELECTED_FEATURES if feature in EXOG_TS] #
SELECTED_FEATURES = data_df.drop(columns=EXOG_TS+["Open", "Volume"], axis=1).columns.to_list() # [feature for feature in SELECTED_FEATURES if feature not in EXOG_TS +["Open", "Volume"])] #

SELECTED_EXOG, SELECTED_FEATURES
(['^VIX', '^TNX', '^RUT', 'GC=F'],
['day_rbf',
'month_rbf',
'quart_rbf',
'biz_rbf',
'Close_target',
'High',
'Low',
'Close'])

Finally, to make sure we have only features that contribute new information, we test for multicolinearity with Variance Inflation Factor (VIF):

Where Ri coefficient of determination of a regression. For feature not to be colinear, they have to score between 1 to 5 (ignoring the constant term which we placed there for an intercept to help in the variance calculation):

from statsmodels.stats.outliers_influence import variance_inflation_factor
from statsmodels.tools.tools import add_constant

data_with_constant = add_constant(data_df[SELECTED_FEATURES + SELECTED_EXOG])
vif_data = pd.DataFrame()
vif_data["feature"] = data_with_constant.columns
vif_data["VIF"] = [
variance_inflation_factor(data_with_constant.values, i)
for i in range(data_with_constant.shape[1])
]

print(vif_data)
feature       VIF
0 const 3.785721
1 day_rbf 1.001582
2 month_rbf 1.599624
3 quart_rbf 1.600373
4 biz_rbf 1.000497
5 Close_target 1.003715
6 High 2.247700
7 Low 2.953168
8 Close 2.392608
9 ^VIX 1.804647
10 ^TNX 1.074184
11 ^RUT 1.503361
12 GC=F 1.034004

Temporal Convolution Neural Network (TCN)

CNNs, RNNs and other stateful deeplearning models are used for timeseries (including transformers, as language is also a timeseries). The TCN is made specifically for timeseries problems, following the architecture inVan den Oord et al., we will build one that is conditioned for the capital markets.

We will use 2 Input 3D Tensors of shape (batch_size, window_size, n_features) for the target timeseries, and (batch_size, window_size, n_timeseries) for the conditioning timeseries sets.

An Output 2D tensor of shape (batch_size, horizon) with a single dense layer to output the next timestep according to the given horizon.

x1 conditioning layers:

  1. x1 Dilated 1D convolutions for the exogenous timeseries to condition the main with.
  2. x1 Dilated 1D convolutions for the main timeseries.
  3. x2 residual layers to add back historic data to the x2 convolution outputs.
  4. x2 Add layers to reinstate residuals.
  5. x1 concatenated layer to combine and output the conditioned main timeseries and residuals.

x4 hidden layers (from the paper’s code) comprised of:

  1. x2 1D convolutions with relu activation and a spatial dropouts.
  2. 1D Dilated Convolution to capture residuals with linear activation.
  3. An addition layer to add back the residuals into the next layers input

Training is done with an ADAM learning optimizer, early stop function, and BinaryFocalCrossentropy as the loss function.

The architecture is visualized in the graph below, taken from tensorboard’s graph plot (the arrows that point to the center, are going out of the block to the next one, its quirk from the tool):

import tensorflow as tf
from tensorflow.keras.layers import (
SpatialDropout1D,
Dense,
Conv1D,
Layer,
Add,
Input,
Concatenate,
Flatten,
)
from tensorflow.keras import Model

assert tf.__version__ >= "2.12"

NONLINEAR_ACTIVATION = "tanh" # LeakyReLU(alpha=0.01)


class TCNBlock(Layer):
"""
TCN Residual Block that uses zero-padding to maintain `steps` value of the ouput equal to the one in the input.
Residual Block is obtained by stacking togeather (2x) the following:
- 1D Dilated Convolution
- ReLu
- Spatial Dropout
And adding the input after trasnforming it with a 1x1 Conv
"""

def __init__(
self,
filters=1,
kernel_size=2,
dilation_rate=1,
kernel_initializer="glorot_normal",
bias_initializer="glorot_normal",
kernel_regularizer=None,
bias_regularizer=None,
use_bias=False,
dropout_rate=0.0,
layer_id=None,
**kwargs,
):
""" "
Arguments
filters: Integer, the dimensionality of the output space
(i.e. the number of output filters in the convolution).
kernel_size: An integer or tuple/list of a single integer,
specifying the length of the 1D convolution window.
dilation_rate: an integer or tuple/list of a single integer, specifying
the dilation rate to use for dilated convolution.
Usually dilation rate increases exponentially with the depth of the network.
activation: Activation function to use
If you don't specify anything, no activation is applied
(ie. "linear" activation: `a(x) = x`).
use_bias: Boolean, whether the layer uses a bias vector.
kernel_initializer: Initializer for the `kernel` weights matrix
bias_initializer: Initializer for the bias vector
kernel_regularizer: Regularizer function applied to the `kernel` weights matrix
bias_regularizer: Regularizer function applied to the bias vector
(see [regularizer](../regularizers.md)).
# Input shape
3D tensor with shape: `(batch, steps, n_features)`
# Output shape
3D tensor with shape: `(batch, steps, filters)`
"""
super(TCNBlock, self).__init__(**kwargs)
assert dilation_rate is not None and dilation_rate > 0 and filters > 0 and kernel_size > 0

self.filters = filters
self.kernel_size = kernel_size
self.dilation_rate = dilation_rate
self.kernel_initializer = kernel_initializer
self.bias_initializer = bias_initializer
self.kernel_regularizer = kernel_regularizer
self.bias_regularizer = bias_regularizer
self.use_bias = use_bias
self.dropout_rate = dropout_rate
self.layer_id = str(layer_id)

def get_config(self):
config = super(TCNBlock, self).get_config()
config.update({
'filters': self.filters,
'kernel_size': self.kernel_size,
'dilation_rate': self.dilation_rate,
'kernel_initializer': self.kernel_initializer,
'bias_initializer': self.bias_initializer,
'kernel_regularizer': self.kernel_regularizer,
'bias_regularizer': self.bias_regularizer,
'use_bias': self.use_bias,
'dropout_rate': self.dropout_rate,
})
return config

def build(self, inputs):
# Capture feature set from the input
self.conv1 = Conv1D(
filters=self.filters,
kernel_size=self.kernel_size,
use_bias=self.use_bias,
bias_initializer=self.bias_initializer,
bias_regularizer=self.bias_regularizer,
kernel_initializer=self.kernel_initializer,
kernel_regularizer=self.kernel_regularizer,
padding="causal",
dilation_rate=self.dilation_rate,
activation=NONLINEAR_ACTIVATION,
name=f"Conv1D_1_{self.layer_id}"
)
# Spatial dropout is specific to convolutions by dropping an entire timewindow,
# not to rely too heavily on specific features detected by the kernels.
self.dropout1 = SpatialDropout1D(
self.dropout_rate, trainable=True, name=f"SpatialDropout1D_1_{self.layer_id}"
)
# Capture a higher order feature set from the previous convolution
self.conv2 = Conv1D(
filters=self.filters,
kernel_size=self.kernel_size,
use_bias=self.use_bias,
bias_initializer=self.bias_initializer,
bias_regularizer=self.bias_regularizer,
kernel_initializer=self.kernel_initializer,
kernel_regularizer=self.kernel_regularizer,
padding="causal",
dilation_rate=self.dilation_rate,
activation=NONLINEAR_ACTIVATION,
name=f"Conv1D_2_{self.layer_id}"
)
self.dropout2 = SpatialDropout1D(
self.dropout_rate, trainable=True, name=f"SpatialDropout1D_2_{self.layer_id}"
)
# The skip connection is an addition of the input to the block with the output of the second dropout layer.
# Solves vanishing gradient, carries info from earlier layers to later layers, allowing gradients to flow across this alternative path.
# Does not learn direct mappings, but differences (residuals) while keeping temporal context.
# Note how it keeps dims intact with kernel 1.
self.skip_out = Conv1D(
filters=self.filters,
kernel_size=1,
activation="relu",
name=f"Conv1D_skipconnection_{self.layer_id}",
)
# This is the elementwise add for the residual connection and Conv1d 2's output
self.residual_out = Add(name=f"residual_Add_{self.layer_id}")

def call(self, inputs):
x = self.conv1(inputs)
x = self.dropout1(x)
x = self.conv2(x)
x = self.dropout2(x)

# Residual output by adding the inputs back
skip_out_x = self.skip_out(inputs)
x = self.residual_out([x, skip_out_x])
return x, skip_out_x

class ConditionalBlock(Layer):
"""
TCN condtioning Block that conditions a target timeseries to exogenous timeserieses.
The Block is obtained by stacking togeather the following:
- 1D Dilated Convolution for the main TS.
- 1D Dilated Convolution for the exog TSs.
- 1D Dilated skip layer for both to retain history.
- ReLu
- Spatial Dropout
And adding the input after trasnforming it with a 1x1 Conv
"""

def __init__(
self,
filters=1,
kernel_size=2,
kernel_initializer="glorot_normal",
bias_initializer="glorot_normal",
kernel_regularizer=None,
bias_regularizer=None,
use_bias=False,
dropout_rate=0.01,
layer_id=None,
**kwargs,
):
super(ConditionalBlock, self).__init__(**kwargs)

assert filters > 0 and kernel_size > 0

self.filters = filters
self.kernel_size = kernel_size
self.kernel_initializer = kernel_initializer
self.bias_initializer = bias_initializer
self.kernel_regularizer = kernel_regularizer
self.bias_regularizer = bias_regularizer
self.use_bias = use_bias
self.dropout_rate = dropout_rate
self.layer_id = str(layer_id)

def get_config(self):
config = super(ConditionalBlock, self).get_config()
config.update({
'filters': self.filters,
'kernel_size': self.kernel_size,
'kernel_initializer': self.kernel_initializer,
'bias_initializer': self.bias_initializer,
'kernel_regularizer': self.kernel_regularizer,
'bias_regularizer': self.bias_regularizer,
'use_bias': self.use_bias,
'dropout_rate': self.dropout_rate,
'id': self.layer_id
})
return config

def build(self, inputs):
self.main_conv = Conv1D(
filters=self.filters,
kernel_size=self.kernel_size,
use_bias=self.use_bias,
bias_initializer=self.bias_initializer,
bias_regularizer=self.bias_regularizer,
kernel_initializer=self.kernel_initializer,
kernel_regularizer=self.kernel_regularizer,
padding="causal",
activation=NONLINEAR_ACTIVATION,
name=f"Conv1D_Conditional_1",
)
self.dropout1 = SpatialDropout1D(
self.dropout_rate, trainable=True, name=f"SpatialDropout1D_1_{self.layer_id}"
)
self.main_skip_conn = Conv1D(
filters=self.filters,
kernel_size=1,
activation="relu",
name=f"Skip_Conditional_1",
)
self.cond_conv = Conv1D(
filters=self.filters,
kernel_size=self.kernel_size,
use_bias=self.use_bias,
bias_initializer=self.bias_initializer,
bias_regularizer=self.bias_regularizer,
kernel_initializer=self.kernel_initializer,
kernel_regularizer=self.kernel_regularizer,
padding="causal",
activation=NONLINEAR_ACTIVATION,
name=f"Conv1D_Conditional_2",
)
self.cond_skip_conn = Conv1D(
filters=self.filters,
kernel_size=1,
activation="relu",
name=f"Skip_Conditional_2",
)
self.dropout2 = SpatialDropout1D(
self.dropout_rate, trainable=True, name=f"SpatialDropout1D_2_{self.layer_id}"
)

def call(self, inputs):
"""
Will apply causal convolutions to every TS and concatenate the results.
:param inputs: Array
A list where inputs[0] is the main input and inputs[1] are the conditional inputs
:return: Array
Tensor of concatenated results.
"""
main_input, cond_input = inputs[0], inputs[1] if len(inputs) > 1 else None

x = self.main_conv(main_input)
x = self.dropout1(x)
skip_out_x = self.main_skip_conn(main_input)
x = Add()([x, skip_out_x])
if cond_input is not None:
cond_x = self.cond_conv(cond_input)
cond_x = self.dropout2(cond_x)
cond_skip_out_x = self.cond_skip_conn(cond_input)
cond_x = Add()([cond_x, cond_skip_out_x])

x = Concatenate(axis=-1)([x, cond_x])
return x


def TCN(
input_shape,
dense_units=None,
conditioning_shapes=None,
output_horizon=1,
filters=[32],
kernel_size=2,
dilation_rate=2,
kernel_initializer="glorot_normal",
bias_initializer="glorot_normal",
kernel_regularizer=None,
bias_regularizer=None,
use_bias=False,
dropout_rate=0.01,
):
"""
Tensorflow TCN Model builder.
see: https://www.tensorflow.org/api_docs/python/tf/keras/Model
see: https://www.tensorflow.org/guide/keras/making_new_layers_and_models_via_subclassing#the_model_class
see: https://www.tensorflow.org/api_docs/python/tf/keras/regularizers/L2

:param layers: int
Number of layers for the network. Defaults to 1 layer.
:param filters: int
the number of output filters in the convolution. Defaults to 32.
:param kernel_size: int or tuple
the length of the 1D convolution window
:param dilation_rate: int
the dilation rate to use for dilated convolution. Defaults to 1.
:param output_horizon: int
the output horizon.
"""
main_input = Input(shape=input_shape, name="main_input")
cond_input = (
Input(shape=conditioning_shapes, name="exog_input")
if conditioning_shapes is not None and len(conditioning_shapes) > 0
else None
)
x = main_input
if cond_input is not None:
x = ConditionalBlock(
filters=filters[0],
kernel_size=kernel_size,
kernel_initializer=kernel_initializer,
bias_initializer=bias_initializer,
kernel_regularizer=kernel_regularizer,
bias_regularizer=bias_regularizer,
use_bias=use_bias,
dropout_rate=dropout_rate,
)([main_input] + [cond_input])
skip_connections = []
for i, filter in enumerate(filters):
x, x_skip = TCNBlock(
filters=filter,
kernel_size=kernel_size,
dilation_rate=dilation_rate ** (i + 1),
kernel_initializer=kernel_initializer,
bias_initializer=bias_initializer,
kernel_regularizer=kernel_regularizer,
bias_regularizer=bias_regularizer,
use_bias=use_bias,
dropout_rate=dropout_rate,
layer_id=i,
)(x)
skip_connections.append(x_skip)
if skip_connections:
skip_connections.append(x)
aggregated = Concatenate(axis=-1, name=f"Final_Residuals")(skip_connections)
aggregated = Conv1D(filters[-1], kernel_size=1, activation='relu', padding='same')(aggregated)


if dense_units:
# Dense networks for deep learning ifrequired.
x = Flatten()(x)
# First layer
x = Dense(dense_units[0], input_shape=input_shape, activation="LeakyReLU", name=f"Dense_0")(x)
for i, units in enumerate(dense_units, start=1):
x = Dense(units , activation="LeakyReLU", name=f"Dense__{i}")(x)
# Last layer
x = Dense(input_shape[0], activation="sigmoid", name=f"Dense_Classifier")(x)
else:
x = Conv1D(filters=output_horizon, kernel_size=1, padding="causal", activation="sigmoid",name=f"Conv_Classifier")(x)
model = Model(
inputs=[main_input, cond_input] if cond_input is not None else [main_input],
outputs=x,
name="TCN_Conditional_Model",
)

return model

A convolution network’s operations over consecutive layers can be represented with the following equation:

Where:

  • xt_l is the output of the neuron at position t in the l-th layer.
  • K is the kernel’s with, determining the number of past time windows considered.
  • wlk is the weight for the k-th position in the kernel used to give importance to past data.
  • d is the dilation factor, or the space between inputs allowing the network to integrate historical information.
  • bl is the bias term.
  • g is ReLU defined as g(x) =max(0,x).

The NN has some special attributes worth talking about, these being dilations and causal layers, which can be visualized by this image taken from the WaveNet paper:

and compare it to what a typical convolution NN would look like:

Dilated Causal Convolutions

From the images above, note how at each layer of neurons, the connections skip a neuron or two — this a dilation factor. Dilations in the TCN increase exponentially its spectral field as itt gets deeper in the network, allowing the neurons to capture history, without leakage.

Additionally, we observe how the neurons are not fully-connected, or they are only connected to the previous neuron but not to the next — this is what is know as a causal padding, meaning no future information is leaked in to the current neuron that is the ‘now’.

Residuals and Skip Connections

Skip connections are a technique from Residual Networks (ResNets), that were used to mitigate the vanishing gradient problem and improve model performance.

A skip connection allows the gradient to be directly backpropagated to earlier layers. In our TCN architecture, skip connections are used to connect the output of each convolutional block directly to a final layer in the network (before the output layer), allowing the network to make use of features learned at various depths directly when making predictions. This is particularly useful for time series data where both recent, and more distant past information might be relevant for prediction.

The image from WaveNet illustrates this concept:

While their function are similar, these connections have slightly different roles:

  • Residual Connections is used within blocks to add the block’s input to its output for gradient flow.
  • Skip Connections skip multiple layers or blocks and are used to directly connect early layers to later ones, allowing the network to leverage features learned at various stages of the network directly in the output.

Training and Validating Model

Time to train and evaluate our model. We’ll use tensorbards, and run it with the command: tensorboard --logdir ./logs, or run the following code in your notebook %tensorboard --logdir logs for in-notebook magic (which I don't recommend outside of a Collab environment).

Additionally, we have to provide callsbacks for tensorflow to feed logs to tensorboards.

from tensorboard import program

tb = program.TensorBoard()
tb.configure(argv=[None, '--logdir', LOG_BASEPATH, '--bind_all'])
url = tb.launch()
print(f"TensorBoard started at {url}")

Some configurations to note in the training:

  • EarlyStopping: We configured a long 300-epoch horizon for training, this will not be used in its full extend in our training. If EarlyStopping detects a flattening gradient, it will stop the training and re-esteblish the better weights. In most of our runs it stopped between 15 to 20 epochs.
  • ReduceLROnPlateau: In addition to early stopping, we also want to decrease the learning alpha as the gradient start to plateauing to help it converge. We start with a large learning rate, and quarter it everytime progress stops, until the early stopping kicks in.
  • TensorBoard: The callback to give logs, images and data to tensorboard.
  • And a final LambdaCallback to print a confusion matrix on every epoch, to allow us to gauge the classification errors and their progression.

Let’s talk about the error function we chose to minimize: BinaryFocalCrossentropy.

The idea behind Focal Loss, and Binary Cross Entropy, is to dynamically scale the loss based on the correctness of the classification. It applies a modulating factor to the loss, reducing the contributions from easy examples and focusing the model on hard examples.

For a binary classification problem, the Binary Focal Loss is defined as:

Where:

  • (p_t) is the model’s predicted probability for the class with label (y=1). Specifically, (p_t = p) if (y=1), and (p_t = 1 — p) otherwise, where (p) is the predicted probability for the class with label (1).
  • (\alpha_t) is a weighting factor for the class with label (1), which helps to manage class imbalance by assigning more weight to the less frequent class.
  • (\gamma) is the focusing parameter that adjusts the rate at which easy examples are down-weighted. When (\gamma = 0), Focal Loss reduces to binary cross-entropy loss.
  • The term ((1 — p_t)^\gamma) effectively reduces the loss contribution from well-classified examples ((p_t \approx 1) for true positives and (p_t \approx 0) for true negatives), putting more focus on examples that are misclassified or classified with low confidence.
  • The (\log(p_t)) term is the log-likelihood of the true class, similar to what is used in cross-entropy loss.
  • The loss function focuses on hard examples only, by reducing the loss contribution from easy-to-classify examples, these being Bearish signals (which are a majority class in our imbalanced dataset), and focus on the hard, often misclassified examples (typically the minority class) which are our profitable bullish signals. This helps in learning discriminative features for the minority class without being overwhelmed by the majority class.

Another important metric that we’ll include is the Receiver Operating Characteristic (ROC) Curve. The models’ training process will provide the area under the ROC curve, which is the True Positive Rate (TPR, also known as sensitivity or recall) against the False Positive Rate (FPR, 1 — specificity) at various threshold settings. The image from wikipedia can help in disambiguating these concepts:

The integration of the following equations make up the ROC AUC metric:

The AUC measures the entire two-dimensional area underneath the curve from (0,0) to (1,1):

  • AUC = 1: The classifier is perfect, able to distinguish all positive and negative instances correctly.
  • 0.5 < AUC < 1: The classifier is better than random guessing, with higher values indicating better classification performance.
  • AUC = 0.5: The classifier’s performance is no better than random guessing.
  • AUC < 0.5: The classifier is performing worse than random guessing! Which can be reversed if we invert the predictions (ML hack!).

It’s important that True Positive (TP) accuracy is high, since we cannot have monetary losses on misclassification. This metric is common in financial and medical field of machine learning problems, where mistakes are costly.

from tensorflow.keras.regularizers import L2, L1L2
from tensorflow.keras.callbacks import EarlyStopping, TensorBoard,ModelCheckpoint,ReduceLROnPlateau,LambdaCallback
from tensorflow.keras.optimizers import Adam
from tensorflow.summary import create_file_writer
from tensorflow.debugging.experimental import enable_dump_debug_info
from tensorflow.math import confusion_matrix
import io

def plot_confusion_matrix(cm, labels, cm2=None, labels2=None):
plt.figure(figsize=(8 if cm2 is not None else 4, 4))
if cm2 is not None:
plt.subplot(1, 2, 1)
plt.imshow(cm, interpolation='nearest', cmap=plt.cm.Accent)

df_cm = pd.DataFrame((cm / np.sum(cm, axis=1)[:, None])*100, index=[i for i in labels], columns=[i for i in labels])
cm_plot1 = sns.heatmap(df_cm, annot=True, fmt=".2f", cmap='Blues', xticklabels=labels, yticklabels=labels).get_figure()
plt.xlabel('Predicted Labels')
plt.ylabel('True Labels')
plt.title('Confusion Matrix 1')
tick_marks = np.arange(len(labels))
plt.xticks(tick_marks, labels, rotation=45)
plt.yticks(tick_marks, labels)

cm_plot2=None
if cm2 is not None:
plt.subplot(1, 2, 2)
df_cm = pd.DataFrame((cm2 / np.sum(cm2, axis=1)[:, None])*100, index=[i for i in labels2], columns=[i for i in labels2])
cm_plot12 = sns.heatmap(df_cm, annot=True, fmt=".2f", cmap='Reds', xticklabels=labels, yticklabels=labels).get_figure()
plt.xlabel('Predicted Labels')
plt.title('Confusion Matrix 2')
plt.tight_layout()

return cm_plot1, cm_plot2

# enable_dump_debug_info(LOG_BASEPATH, tensor_debug_mode="FULL_HEALTH", circular_buffer_size=-1)

def build_tcn(
input_shape,
X, y,
Xt=None, yt=None,
conditioning_shapes=None,
val_split=VAL_SPLIT,
output_horizon=PREDICTION_HORIZON,
filters=FILTERS,
kernel_size=KERNEL_SIZE,
dilation_rate=DILATION_RATE,
kernel_regularizer=L1L2(l1=REG_WEIGHTS, l2=REG_WEIGHTS//10),
bias_regularizer=L1L2(l1=REG_WEIGHTS, l2=REG_WEIGHTS//10),
dropout_rate=DROPRATE,
dense_units=HIDDEN_DENSE,
lr=LEARN_RATE,
patience=PATIENCE_EPOCHS,
epochs=EPOCHS,
batch_size=BATCH_SIZE,
use_bias=BIAS,
loss=LOSS,
tb=True,
):
def log_confusion_matrix(epoch, logs):
def _plot_to_image(figure):
"""Converts the matplotlib plot specified by 'figure' to a PNG image and
returns it. The supplied figure is closed and inaccessible after this call."""
buf = io.BytesIO()
plt.savefig(buf, format='png')
plt.close(figure)
buf.seek(0)
image = tf.image.decode_png(buf.getvalue(), channels=4)
image = tf.expand_dims(image, 0)
return image
# model is global as is XT and yt
assert Xt is not None and len(Xt) > 1 and len(yt) > 1
ypred = model.predict(Xt)
cm = confusion_matrix(yt.flatten(), ypred.flatten())
figure, _ = plot_confusion_matrix(cm, labels=[1,0])
cm_image = _plot_to_image(figure)

file_writer_cm = create_file_writer(LOG_BASEPATH)
with file_writer_cm.as_default():
tf.summary.image("Confusion Matrix", cm_image, step=epoch)

assert len(X) > 1 and len(y) > 1 and input_shape is not None
global model, globalXt, globalyt
globalXt = Xt
globalyt = yt

model = TCN(
input_shape=input_shape,
conditioning_shapes=conditioning_shapes,
dense_units=dense_units,
output_horizon=output_horizon,
filters=filters,
kernel_size=kernel_size,
dilation_rate=dilation_rate,
kernel_regularizer=kernel_regularizer,
bias_regularizer=bias_regularizer,
use_bias=use_bias,
dropout_rate=dropout_rate,
)

model.compile(loss=loss, optimizer=Adam(lr), metrics=METRICS)
callbacks = [EarlyStopping(
patience=patience,
monitor=f"val_{TARGET_METRIC}",
restore_best_weights=True,
),
ReduceLROnPlateau(
monitor=f"val_{TARGET_METRIC}",
factor=0.3,
patience=patience//2,
verbose=1,
min_delta=0.00001,
)]
if tb:
callbacks.append(TensorBoard(log_dir=MODEL_LOG_DIR,
histogram_freq=1,
write_graph=True,
write_images=True,
update_freq='epoch',
profile_batch=2,
embeddings_freq=1))
if tb and IS_CLASSIFICATION:
callbacks.append(LambdaCallback(on_epoch_end=log_confusion_matrix))
if Xt is not None:
assert len(Xt) > 1 and len(yt) > 1
history = model.fit(
X,
y,
validation_data=(Xt, yt),
epochs=epochs,
batch_size=batch_size,
callbacks=callbacks,
verbose=1,
)
else:
assert val_split > 0 and Xt is None and yt is None
history = model.fit(
X,
y,
validation_split=val_split,
epochs=epochs,
batch_size=batch_size,
callbacks=callbacks,
verbose=0,
)
return model, history

Encoding Time Windows

Given all the special attributes we described for the TCN, you shouldn’t be surprised that the data needs special treatment. It needs to be encoded into what is known as time windows or time steps. The image from Gasparin et al. gives a good visualization of these time steps:

The data was normalised and encoded into 3 months-long windows, with a one-day lag from so we can predict the market direction of the next day:

def prepare_windows(
data_df,
target_df,
prime_ts=SELECTED_FEATURES,
exog_ts=SELECTED_EXOG,
window_size=WINDOW_SIZE,
horizon=PREDICTION_HORIZON,
):
"""
Create input and target windows suitable for TCN model.
:param data: DataFrame with shape (n_samples, n_features)
:param features: List of strings, names of the feature columns
:param target_df: Optional the labels if this encoding is for training.
:param window_size: int, length of the input sequence.
:param horizon: int, forecasting horizon.
:return: 3 Arrays in the shape of (n_samples, n_steps, n_features) for the training data, the exogenous data, and the labels (this last is optional)
"""
X, Xexog, y = [], [], []
for i in tqdm(
range(len(data_df) - window_size - horizon + 1), desc=f"Encoding Widows of {window_size}"
):
input_window = data_df[prime_ts].iloc[i : i + window_size].values
X.append(input_window)
input_window = data_df[exog_ts].iloc[i : i + window_size].values
Xexog.append(input_window)
if target_df is not None:
target_window = target_df.iloc[i : i + window_size].values
y.append(target_window)
return np.array(X), np.array(Xexog), np.array(y)

train_data, train_exog_data, ytrain_data = prepare_windows(data_x_df, data_x_df[TARGET_LABEL])
test_data, test_exog_data, ytest_data = prepare_windows(data_t_df, data_t_df[TARGET_LABEL])

assert not np.any(pd.isna(train_data)) and not np.any(pd.isna(train_exog_data))

print(f"Label shape encoded: {ytrain_data.shape}")
print(f"Data shapes for prime TS: {train_data.shape}, exog TS: {train_exog_data.shape}")
print(f"First window: {train_exog_data[:1][0]}")
print(f"First window: {train_data[:1][0]}")
print(f"First window targets: {ytrain_data[:1][0]}")

input_shape = (
WINDOW_SIZE,
1 if len(train_data.shape) < 3 else train_data.shape[2],
) # if we have no additonal features X.shape[1]
conditioning_shapes = (WINDOW_SIZE, train_exog_data.shape[2])
print(f"Model logs for Tensorboard available here: {MODEL_LOG_DIR}")
print(f"Input Shape: {input_shape} and Condtioning shapes: {conditioning_shapes}")

assert not np.any(np.isnan(train_data))
assert not np.any(np.isnan(train_exog_data))
assert not np.any(np.isnan(ytrain_data))

Before we train, we will set aside some out-of-sample data which the model will not see. This will be used to test the model’s generalization later on:

xtrain_oos, Xexog_oos, y_oos = prepare_windows(x_oos, x_oos[TARGET_LABEL])
y_oos

GridSearch and HyperParams

The model is complex, and to manually fine-tune every parameter is impossible. We’ll run a grid search which after a day or so of permuting parameter combinations, should identify good enough hyperparameters.

In our tests, we got this setup:

{
"best_params": {
"batch_size": 124,
"bias": true,
"dense_units": "126",
"dilation_rate": 1,
"dropout_rate": 0.05,
"epochs": 300,
"filters": "32_128",
"kernel_size": 2,
"learning_rate": 0.0025,
"patience": 15,
"reg_weight": 0.0002
},
"best_loss": 0.00413,
"best_metric": 0.8180,
}

The x_x_x hparams are our way of encoding an array in tensorflow’s hparam framework. These need to be decoded before being passed to the model:

# See: https://scikit-learn.org/stable/modules/generated/sklearn.model_selection.ParameterGrid.html
# See paper: https://www.mdpi.com/2076-3417/10/7/2322

HP_KERNEL_SIZE = hp.HParam("kernel_size", hp.Discrete([KERNEL_SIZE * 2, KERNEL_SIZE]))
HP_BATCH_SIZE = hp.HParam("batch_size", hp.Discrete([BATCH_SIZE]))
HP_EPOCHS = hp.HParam("epochs", hp.Discrete([EPOCHS]))
HP_DILATION_RATE = hp.HParam("dilation_rate", hp.Discrete([DILATION_RATE]))
HP_DROPOUT_RATE = hp.HParam("dropout_rate", hp.Discrete([DROPRATE, DROPRATE*2]))
HP_REG_WEIGHTS = hp.HParam("reg_weight", hp.Discrete([REG_WEIGHTS, REG_WEIGHTS*2]))
HP_LEARNING_RATE = hp.HParam("learning_rate", hp.Discrete([LEARN_RATE]))
HP_PATIENCE = hp.HParam("patience", hp.Discrete([PATIENCE_EPOCHS]))
HP_BIAS = hp.HParam("bias", hp.Discrete([BIAS, False]))
HP_HIDDEN_DENSE = hp.HParam("dense_units", hp.Discrete([
f"{WINDOW_SIZE}",
f"{WINDOW_SIZE*2}_{WINDOW_SIZE}",
]))
HP_FILTERS = hp.HParam("filters", hp.Discrete([
f"{MIN_FILTER}",
f"{MIN_FILTER}_{MIN_FILTER*2}",
f"{MIN_FILTER*2}_{MIN_FILTER*4}",
]))

HPARAMS = [
HP_FILTERS,
HP_KERNEL_SIZE,
HP_BATCH_SIZE,
HP_EPOCHS,
HP_DILATION_RATE,
HP_DROPOUT_RATE,
HP_REG_WEIGHTS,
HP_LEARNING_RATE,
HP_PATIENCE,
HP_BIAS,
HP_HIDDEN_DENSE
]

GRID_SEARCH_TRAIN = False

def grid_search_build_tcn(
input_shape, X, y, Xt=None, yt=None, hparams=HPARAMS, conditioning_shapes=None, file_name="best_params.json"
):
def _decode_arrays(config_str):
return [int(unit) for unit in config_str.split('_')]

def _save_best_params(best_params, best_loss, best_metric, file_name="best_params.json"):
os.makedirs(MODEL_DIR, exist_ok=True)
with open(f"{MODEL_DIR}/{file_name}", "w") as file:
json.dump({"best_params": best_params, "best_loss": best_loss, "best_metric": best_metric}, file)

with create_file_writer(f"{MODEL_LOG_DIR}/hparam_tuning").as_default():
hp.hparams_config(
hparams=hparams,
metrics=[hp.Metric(TARGET_METRIC, display_name=TARGET_METRIC)],
)
grid = list(ParameterGrid({h.name: h.domain.values for h in hparams}))
best_model = None
best_loss = np.inf
best_metric = -np.inf
best_params = None
best_history = None
for hp_values in tqdm(grid, desc="Grid Search.."):
dense_units = _decode_arrays(hp_values["dense_units"])
filters = _decode_arrays(hp_values["filters"])
k = hp_values["kernel_size"]
d = hp_values["dilation_rate"]
rw = hp_values["reg_weight"]
drop = hp_values["dropout_rate"]
print(f"Shapes{input_shape}{conditioning_shapes}: x{X[0].shape}xg{X[1].shape}y{y.shape}, filters {filters}, dense {dense_units}, k: {k}, d: {d}, rw: {rw}, drop: {drop}")
print(f"")

model, history = build_tcn(input_shape, X, y,
conditioning_shapes=conditioning_shapes,
output_horizon=PREDICTION_HORIZON,
Xt=Xt, yt=yt,
filters=filters,
kernel_size=k,
dilation_rate=d,
kernel_regularizer=L1L2(l1=rw, l2=rw),
bias_regularizer=L1L2(l1=rw, l2=rw),
dropout_rate=drop,
dense_units=dense_units)
loss = np.min(history.history[f"val_loss"])
metric = np.min(history.history[f"val_{TARGET_METRIC}"])
if (loss < best_loss) and (best_metric > metric):
print(f"best metric: {metric}")
print(f"best loss: {loss}")
print(f"best params: {hp_values}")
best_history = history
best_loss = loss
best_metric = metric
best_model = model
best_params = hp_values
_save_best_params(best_params, best_loss, best_metric, file_name)
return best_model, best_history

if GRID_SEARCH_TRAIN:
model, history = grid_search_build_tcn(input_shape, [train_data, train_exog_data], ytrain_data, Xt=[test_data, test_exog_data], yt=ytest_data, conditioning_shapes=conditioning_shapes))

Model Evaluations

Initially, we discussed how precision is our most important metric.

Another good evaluation criteria is the confusion matrix. In our binary classification context, the confusion matrix consists of 2 rows and 2 columns, each representing the instances in a predicted class versus instances in an actual class. Envision a square matrix of these:

  • True Positives (TP): The number of positive instances correctly classified as positive.
  • False Positives (FP): The number of negative instances incorrectly classified as positive.
  • True Negatives (TN): The number of negative instances correctly classified as negative.
  • False Negatives (FN): The number of positive instances incorrectly classified as negative.

And the results of our model below:

assert not np.array_equal(train_data, test_data) and not np.array_equal(train_exog_data, test_exog_data), "Training and test should not be identical."

y_pred = model.predict([train_data, train_exog_data])
yt_pred = model.predict([test_data, test_exog_data])

y_pred_orig = y_pred.copy()
yt_pred_orig = yt_pred.copy()
y_pred = (y_pred > 0.5).astype(int)
yt_pred = (yt_pred > 0.5).astype(int)

print(f"Prediction shape: {yt_pred.shape} vs test data shape: {ytest_data.shape}")
print(f"Test data 1 horizon sample: {ytest_data[0]}")
print(f"Prediction data 1 horizon sample: {yt_pred[0].T}")
print(f"Prediction 1 horizon sample: {yt_pred.flatten()[0]} VS {ytest_data.flatten()[0]}")

cm_train = confusion_matrix(ytrain_data.flatten(), y_pred.flatten())
cm_test = confusion_matrix(ytest_data.flatten(), yt_pred.flatten())

cm_train_np = cm_train.numpy()
cm_test_np = cm_test.numpy()

plot_confusion_matrix(cm_train_np, [0,1], cm_test_np, [0,1])
plt.show()

Confusion Matrix 2 is our validation dataset, and it has very few FP which is good.

The F1 Score is another metric we’ll look at, which is a statistical measure used to evaluate the performance of binary classification models in situations where the class distribution is imbalanced. It is the harmonic mean of precision and recall which signal the robustness of our classifier. T

he F1 Score reaches its best value at 1 (perfect precision and recall) and worst at 0, and is calculated as follows:

An out of the box F1 score assumes both precision and recall are important, which is not good for our use case. We will use a variant called the F-Beta Score, which defined as follows:

Where beta can be altered to give more importance to either of the metrics, weighting precision (<1) or recall (β>1).

from sklearn.metrics import (
accuracy_score, precision_score, recall_score,
fbeta_score, roc_auc_score, roc_curve, auc
)

def directional_accuracy(y, y_pred):
a = np.array(y).flatten()
p = np.array(y_pred).flatten()

a_dir = np.sign(np.diff(a))
p_dir = np.sign(np.diff(p))
correct_dirs = np.sum(a_dir == p_dir)
acc = correct_dirs / len(a_dir)

return acc


print(f"shapes y_pred: {y_pred.shape} and yt_pred: {yt_pred.shape}")

accuracy_test = accuracy_score(ytest_data.flatten(), yt_pred.flatten())
precision_test = precision_score(ytest_data.flatten(), yt_pred.flatten())
recall_test = recall_score(ytest_data.flatten(), yt_pred.flatten())
f1_test = fbeta_score(ytest_data.flatten(), yt_pred.flatten(), average="weighted", beta=0.5)
roc_auc_test = roc_auc_score(ytest_data.flatten(), yt_pred_orig.flatten(), average='weighted')
metrics_df = pd.DataFrame({
"Accuracy": [accuracy_test],
"Precision": [precision_test],
"Recall": [recall_test],
"F1b Score": [f1_test],
"ROC AUC": [roc_auc_test],
}, index=["Test"])

fig, axs = plt.subplots(3 if IS_CLASSIFICATION else 2, 1, figsize=(12, 10))
axs[0].plot(history.history["loss"], label="Train loss")
axs[0].plot(history.history["val_loss"], label="Validation loss")
axs[0].set_xlabel("Epochs")
axs[0].set_ylabel("Loss")
axs[0].legend()
axs[1].plot(
history.history[TARGET_METRIC],
label=f"Train {TARGET_METRIC}",
)
axs[1].plot(
history.history[f"val_{TARGET_METRIC}"],
label=f"Test {TARGET_METRIC}",
color="k",
)
axs[1].axhline(accuracy_test, color="r", linestyle="-.", label=f"Acc: {accuracy_test:.2f}")
axs[1].axhline(precision_test, color="g", linestyle="--", label=f"Prec: {precision_test:.2f}")
axs[1].axhline(recall_test, color="g", linestyle="-.", label=f"Rec: {recall_test:.2f}")
axs[1].axhline(f1_test, color="c", linestyle="--", label=f"Fbeta: {f1_test:.2f}")
axs[1].set_xlabel("Epochs")
axs[1].set_ylabel("Score")
axs[1].legend()


fpr, tpr, _ = roc_curve(ytest_data.flatten(), yt_pred_orig.flatten())
roc_auc_test = auc(fpr, tpr)
axs[2].plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (area = {roc_auc_test:.2f})')
axs[2].plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--', label='ROC Line')
axs[2].set_xlabel('False Positive Rate')
axs[2].set_ylabel('True Positive Rate')
axs[2].legend()

plt.show()

Walk Forwards on Out-of-Sample Data

Very good metrics, with an AUC of almost 1 — which is a must in this domain of prediction if we wish to avoid financial losses. let’s test the model on data it never saw with walk forwards on out-of-sample data which we have withheld before the whole training and validation cycles:

y_oos_pred_raw = model.predict([xtrain_oos, Xexog_oos])
y_oos_pred = (y_oos_pred_raw > 0.5).astype(int)

print(f"Prediction shape: {y_oos_pred.shape} vs test data shape: {y_oos.shape}")
print(f"Test data 1 horizon sample: {y_oos[0]}")
print(f"Predicted data 1 horizon sample: {y_oos_pred[0].T}")
print(f"Prediction 1 horizon sample: {y_oos_pred.flatten()[0]} VS {y_oos.flatten()[0]}")

metrics_oos_df = None
if IS_CLASSIFICATION:
accuracy_test = accuracy_score(y_oos.flatten(), y_oos_pred.flatten())
precision_test = precision_score(y_oos.flatten(), y_oos_pred.flatten())
recall_test = recall_score(y_oos.flatten(), y_oos_pred.flatten())
f1_test = fbeta_score(y_oos.flatten(), y_oos_pred.flatten(), average="weighted", beta=0.5)
roc_auc_test = roc_auc_score(y_oos.flatten(), y_oos_pred_raw.flatten())
metrics_oos_df = pd.DataFrame({
"Accuracy": [accuracy_test],
"Precision": [precision_test],
"Recall": [recall_test],
"F1b Score": [f1_test],
"ROC AUC": [roc_auc_test],
}, index=["Test"])

metrics_oos_df

Model Cross-Validation

With such good training/validation metrics and good out-of-sample metrics, the model is too good to be trusted!

We’ll need more rigorous testing, starting with a 3 fold cross validation using a TimeSeries splitter from Scikit-Learn to increase our trust in the Model. In addition, we will perturb our data with faint gaussian noise, this should destabilize a model that a weak classifier:

from sklearn.model_selection import TimeSeriesSplit
import json
import os

CV_MODEL = True
CV_SPLITS = 3

def train_cv_model(X, y, input_shape, conditioning_shapes=None, n_splits=5, perturb=True):
def _save_cv(results_df, file_name="cv_results.json"):
os.makedirs(MODEL_DIR, exist_ok=True)
file_path = os.path.join(MODEL_DIR, file_name)
with open(file_path, "w") as file:
json.dump({"CV results": results_df.to_dict(orient="records")}, file)

def _perturb_gaussiannoise(X, noise_level=0.1):
sigma = noise_level * np.std(X)
noise = np.random.normal(0, sigma, X.shape)
return X + noise

if perturb:
X = _perturb_gaussiannoise(X)

results = []
tscv = TimeSeriesSplit(n_splits=n_splits)

for train_index, test_index in tqdm(tscv.split(X), desc=f"CV Testing for n_splits: {n_splits}"):
X_train = X.iloc[train_index]
y_train = y.iloc[train_index]
X_test = X.iloc[test_index]
y_test = y.iloc[test_index]

X_train_windows, X_Exog_train, y_train_windows = prepare_windows(X_train, y_train)
X_test_windows, X_Exog_test, y_test_windows = prepare_windows(X_test, y_test)

try:
cv_model, _ = build_tcn(input_shape, [X_train_windows, X_Exog_train], y_train_windows, [X_test_windows, X_Exog_test], y_test_windows, conditioning_shapes=conditioning_shapes, tb=False)
result = cv_model.evaluate([X_test_windows, X_Exog_test], y_test_windows, verbose=0)
results.append(result)
except Exception as e:
print(f"CV error on fold with exception: {e}")

metrics_names = [metric.name for metric in cv_model.metrics]
results_df = pd.DataFrame(results, columns=metrics_names)
_save_cv(results_df)

return results_df


if CV_MODEL:
results_df = train_cv_model(data_df, data_df[TARGET_LABEL], input_shape, conditioning_shapes=conditioning_shapes)
print(results_df)
loss       auc  binary_crossentropy  binary_accuracy  accuracy
0 0.057554 0.950727 0.311374 0.845933 0.075082
1 0.027051 0.993098 0.120708 0.951535 0.041349
2 0.009376 0.999359 0.029629 0.989378 0.095756
3 0.005980 0.999570 0.017562 0.994214 0.109902
4 0.004065 0.999443 0.022164 0.991796 0.083787

Performance on Unseen Securities

The CV metrics are very promising, it does miss a lot of predictions, but those that it gets, is almost 100% correct!

Still not to be trusted until we test the model in the real world now.

Testing on the target index constituents first, as we assume the model has generalized well enough to have good performance:

assert model is not None

OHTER_ETFS = ["SSO", "SPXL", "BAC", "TQQQ", "AAPL", "HSBC", "TM"] # These are leveraged & constituents
LAST_TRADING_DAY_STR = LAST_TRADING_DAY.strftime('%Y-%m-%d')
FIRST_WINDOW_DAY_STR = FIRST_WINDOW_DAY.strftime('%Y-%m-%d')

def generalization_test(unseen_tickers_list, start, end, gen_model=None, input_shape=input_shape, conditioning_shapes=conditioning_shapes):
metrics_unseen = []
for ticker in tqdm(unseen_tickers_list, desc="Testing on Unseen TS"):
current_tickers = tickers.copy()
other_etf, latest_start, earliest_end = get_tickerdata([ticker], start=start, end=end)
current_tickers[TARGET_ETF] = other_etf[ticker]
print(f"Date ranges for {ticker} are {latest_start.strftime('%Y-%m-%d')} - {earliest_end.strftime('%Y-%m-%d')}")

for ticker_symbol in tickers_symbols:
df = current_tickers.get(ticker_symbol)
df = df[(df.index >= latest_start) & (df.index <= earliest_end)]
assert not df.empty and not df.isna().any().any() and len(df) > WINDOW_SIZE//2, f"Data validation failed for {ticker_symbol}"
current_tickers[ticker_symbol] = df
unseendata_df, _ = prepare_data(current_tickers, to_normalize=True)
assert not np.any(pd.isna(unseendata_df)) and len(unseendata_df) > WINDOW_SIZE//2, "Unseen data preparation failed"
unseen_X, unseen_Xexog, unseen_y = prepare_windows(unseendata_df, unseendata_df[TARGET_LABEL])

unseen_ypred_raw = gen_model.predict([unseen_X, unseen_Xexog])
unseen_pred = (unseen_ypred_raw > 0.5).astype(int)
metrics = {}
if IS_CLASSIFICATION:
metrics = {
"Accuracy": accuracy_score(unseen_y.flatten(), unseen_pred.flatten()),
"Precision": precision_score(unseen_y.flatten(), unseen_pred.flatten()),
"Recall": recall_score(unseen_y.flatten(), unseen_pred.flatten()),
"F1b Score": fbeta_score(y_oos.flatten(), y_oos_pred.flatten(), average="weighted", beta=0.5),
"ROC AUC": roc_auc_score(unseen_y.flatten(), unseen_pred.flatten(), average='weighted')
}
metrics_unseen.append({ticker: metrics})
metrics_unseen_df = pd.DataFrame.from_dict({list(d.keys())[0]: list(d.values())[0] for d in metrics_unseen}, orient='index')
return metrics_unseen_df

metrics_unseen_df = generalization_test(OHTER_ETFS, FIRST_WINDOW_DAY_STR, LAST_TRADING_DAY_STR, gen_model=model)
metrics_unseen_df

Black Swan Performance

Let’s also do some exotic tests, we will re-train the model and apply it to a black swan event (the 2019 pandemic), as we assume that an economy that is always on the rise is easy to predict, but a in market turmoil should cause the model to fail:

assert model is not None

COVID_REGIME_START = "2019-10-01"
COVID_REGIME_END = "2021-01-01"

current_tickers = tickers.copy()
other_etf, latest_start, earliest_end = get_tickerdata([ticker], start=START_DATE, end=COVID_REGIME_END)
current_tickers[TARGET_ETF] = other_etf[ticker]
print(f"Date ranges for {ticker} are {latest_start.strftime('%Y-%m-%d')} - {earliest_end.strftime('%Y-%m-%d')}")

for ticker_symbol in tickers_symbols:
df = current_tickers.get(ticker_symbol)
df = df[(df.index >= latest_start) & (df.index <= earliest_end)]
assert not df.empty and not df.isna().any().any() and len(df) > WINDOW_SIZE//2, f"Data validation failed for {ticker_symbol}"
current_tickers[ticker_symbol] = df

unseendata_df, _ = prepare_data(current_tickers, to_normalize=True)
unseen_X, unseen_Xexog, unseen_y = prepare_windows(unseendata_df, unseendata_df[TARGET_LABEL])
gen_model, gen_hist = build_tcn(input_shape, [unseen_X, unseen_Xexog], unseen_y, conditioning_shapes=conditioning_shapes, tb=False)
print(f'{TARGET_ETF} val AUC: {gen_hist.history[f"val_{TARGET_METRIC}"][-1]}')
Epoch 36: ReduceLROnPlateau reducing learning rate to 6.074999873817433e-06.
SPY val AUC: 0.9998917579650879
metrics_unseen_df = generalization_test(OHTER_ETFS, START_DATE, COVID_REGIME_END, gen_model=gen_model)
metrics_unseen_df

Low Correlation Securities

Performance is unaffected in a market downturn wtih high accuracy, high precision — what more do we want!

Let’s truly stress the model with predictions in the same chaotic conditions, on ‘market-neutral’ stocks — specifically the meme-stocks that came to existence in the covid regime:

OHTER_ETFS = ["ZM", "SHOP", "VIG", "VOO", "TSLA", "WDI.HM"]  # Including meme stocks and frauds

metrics_unseen_df = generalization_test(OHTER_ETFS, COVID_REGIME_START, COVID_REGIME_END, gen_model=gen_model)
metrics_unseen_df

The model really seems to be unchallenged!

Is this because the problem at hand is that easy to solve for a moderately complex NN? Or are we not seeing some deep issue with our data or architecture?

Conclusion

In this article, we tested a day-trader’s assumption that buying leveraged ETFs at start and selling at close is profitable — which we understood that it is an unprofitable strategy with only 29% chance of success. We then outperformed this naive strategy using deep learning, which had a true positive rate of 99%.

The deep learning model is one inspired by Google’s WaveNet from Van den Oord et al., which uses a temporal convolution network, with causal dilating layers, skip layers to preserve historic residuals & expedite gradient intercept, and finally conditioning of the input timeseries with exogenous multiple timeseries before the TCN’s main convolutions are done.

This is model, dataset and article are for research purposes only, but here is DALL·E 2’s depiction of AI beating a finfluencing youtuber, have fun!

References

Github

Article here is also available on Github

Kaggle notebook available here

Media

All media used (in the form of code or images) are either solely owned by me, acquired through licensing, or part of the Public Domain and granted use through Creative Commons License.

CC Licensing and Use

This work is licensed under a Creative Commons Attribution-NonCommercial 4.0 International License.

--

--

Adam
Call For Atlas

People, tech, and product focused senior technology leader with a penchant for AI in quant finance: https://www.linkedin.com/in/adam-darmanin/ #CallForAtlas