Oil Price Forecasting Using Conditional Generative Adversarial Networks (GANs) with Sentiment Analysis

M Alruqimi
6 min readJul 3, 2024

--

Part 3: Build and train the model

Build and train the model

This is the final article in this series. In the previous article, we prepared our dataset. In this article, I will create the GAN model for forecasting Brent oil prices.

Let’s start by creating the generator and discriminator. For more details and definitions, refer to Part 1: Introduction.

class Generator(nn.Module):
def __init__(self, hidden_dim, feature_no, seq_len, output_dim, dropout):
super().__init__()
self.hidden_dim = hidden_dim
self.layer_dim = 1
self.input_dim = feature_no
self.output_dim = output_dim
self.dropout = dropout
self.seq_len = seq_len

# LSTM layers
self.lstm = nn.LSTM(
self.input_dim + config.noise_size, self.hidden_dim, self.layer_dim, batch_first=True, bidirectional=True, dropout=self.dropout
)
self.gru = nn.GRU(self.hidden_dim * 2, self.hidden_dim, batch_first=True, bidirectional=True,
dropout=self.dropout)

# Fully connected layer
self.fc_1 = nn.Linear(self.hidden_dim * 2, 12) # fully connected
self.fc_2 = nn.Linear(12, self.output_dim) # fully connected last layer
self.relu = nn.ReLU()

def forward(self, x, noise, batch_size): # x = [batch_size, sequence_length, feature_no]

# Initialize hidden state and cell state
h0 = torch.zeros(self.layer_dim * 2, x.size(0), self.hidden_dim, device=x.device).requires_grad_()
c0 = torch.zeros(self.layer_dim * 2, x.size(0), self.hidden_dim, device=x.device).requires_grad_()

# Downsample the noise to match the sequence length of x_batch
noise_downsampled = noise.unsqueeze(1).expand(-1, self.seq_len, -1) # Shape: (batch_size, seq_length, noise_dim)
noise_downsampled = noise_downsampled[:, :self.seq_len, :] # Downsample to match the sequence length

# Concatenate the noise with the input features along the feature dimension
x_combined = torch.cat((x, noise_downsampled),
dim=-1) # Shape: (batch_size, seq_length, features_number + noise_dim)

out, (hn, cn) = self.lstm(x_combined, (h0, c0))

# Reshaping the outputs in the shape of (batch_size, seq_length, hidden_size)
# so that it can fit into the fully connected layer
out = out[:, -1, :]

# Convert the final state to our desired output shape (batch_size, output_dim)
out = self.fc_1(out) # first dense
out = self.relu(out) # relu
out = self.fc_2(out) # final output
return out

class Discriminator(nn.Module):
def __init__(self, seq_len, hidden_dim):
super().__init__()
self.discriminator_latent_size = hidden_dim
self.x_batch_size = seq_len
self.input_to_latent = nn.GRU(input_size=1,
hidden_size=hidden_dim)
self.model = nn.Sequential(
nn.Linear(in_features=hidden_dim, out_features=1),
nn.Sigmoid()
)

def forward(self, prediction, x_batch):
# Ignore the extrnal feature SENT
x_batch = x_batch[:, :, 0] # batch x seq_len

d_input = torch.cat((x_batch, prediction.view(-1, 1)),
dim=1)
d_input = d_input.view(-1, self.x_batch_size + 1, 1)
d_input = d_input.transpose(0, 1)
d_latent, _ = self.input_to_latent(d_input)
d_latent = d_latent[-1]
output = self.model(d_latent)

return output

Then, I need to create some helper functions.

Load real samples:

def load_real_samples(batch_size,x_train, y_train):
idx = rs.choice(x_train.shape[0], batch_size)
x_batch = x_train[idx]
y_batch = y_train[idx]
return x_batch, y_batch

Genearte noise:

rs = np.random.RandomState(4)
def generate_noise(noise_size, batch_size, noise_type, rs):
noise = []
if noise_type == 'normal':
noise = rs.normal(0, 1, (batch_size, noise_size))
elif noise_type == 'uniform':
return torch.rand(batch_size, noise_size) * 2 - 1 # Uniform between -1 and 1
else:
raise ValueError(f"Unsupported noise type: {noise_type}")
return torch.tensor(noise, dtype=torch.float32)

Generate fake samples:

def generate_fake_samples(generator, noise_size, x_batch):

noise_batch = generate_noise(noise_size, x_batch.size(0), config.noise_type, rs)

y_fake = generator(x_batch, noise_batch, x_batch.size(0)).detach()
# labels = zeros((x_batch.size(0), 1)) #Label=0 indicating they are fake
return x_batch, y_fake

def calc_crps(ground_truth, predictions, predictions2):
return np.absolute(predictions - ground_truth).mean() - 0.5 * np.absolute(predictions - predictions2).mean()

Train the model

The train function trains the model for a specified number of epochs, tracking and updating the generator and discriminator's performance. It starts by initializing variables for tracking losses, gradients, and time. In each training step, it loads real data samples, computes the discriminator's loss on real and fake samples, and updates the discriminator's weights. Then, it generates fake samples, computes the generator's loss, and updates the generator's weights. Every 100 steps, the function evaluates the generator's performance using a CRPS metric, saves the best-performing generator, and logs the progress. After training, it plots the loss and gradient histories and returns the trained generator along with the runtime.

def train(best_crps):
best_gen = None
import time
start_time = time.time() # Record the start time
generator_losses, discriminator_losses, d_loss = [], [], 0
generator_gradients, discriminator_gradients = [], []

for step in range(config.epochs):
# load real samples
# x_bach = batch x seq_len x feature_no
# y_batch = batch_size x pred_len
x_batch, y_batch = load_real_samples(config.batch_size, x_train, y_train)

# train D on real samples
discriminator.zero_grad()
d_real_decision = discriminator(y_batch, x_batch)
d_real_loss = adversarial_loss(d_real_decision,
torch.full_like(d_real_decision, 1, device=device))
d_real_loss.backward()
d_loss += d_real_loss.detach().cpu().numpy()

# train discriminator on fake data
x_batch, y_fake = generate_fake_samples(generator, config.noise_size, x_batch)
d_fake_decision = discriminator(y_fake, x_batch)
d_fake_loss = adversarial_loss(d_fake_decision,
torch.full_like(d_fake_decision, 0, device=device))
d_fake_loss.backward()

optimizer_d.step()
d_loss += d_fake_loss.detach().cpu().numpy()

d_loss = d_loss / 2
# Track Discriminator gradients
discriminator_gradients.append(
torch.mean(torch.tensor([p.grad.norm() for p in discriminator.parameters() if p.grad is not None])).item()
)

generator.zero_grad()
# noise_batch = torch.tensor(rs.normal(0, 1, (batch_size, noise_size)), device=device,
# dtype=torch.float32)
noise_batch = generate_noise(config.noise_size, config.batch_size, config.noise_type, rs)
y_fake = generator(x_batch, noise_batch, config.batch_size)

# print("y_fake", y_fake.shape)
d_g_decision = discriminator(y_fake, x_batch)
g_loss = -1 * adversarial_loss(d_g_decision, torch.full_like(d_g_decision, 0, device=device))

g_loss.backward()
optimizer_g.step()

g_loss = g_loss.detach().cpu().numpy()
generator_gradients.append(
torch.mean(torch.tensor([p.grad.norm() for p in discriminator.parameters() if p.grad is not None])).item()
)

# Validation
if step % 100 == 0:
with torch.no_grad():
generator.eval()
predictions = []
for _ in range(200):
noise_batch = generate_noise(config.noise_size,x_val.size(0),config.noise_type, rs)

predictions.append(generator(x_val, noise_batch, batch_size=1
).cpu().detach().numpy())

predictions = np.stack(predictions)

generator.train()
# print(y_val.shape)
crps = calc_crps(y_val, predictions[:100], predictions[100:])

if crps <= best_crps:
best_crps = crps
torch.save({'g_state_dict': generator.state_dict()}, 'checkpoint.pt')
best_gen = generator
print("step : {} , d_loss : {} , g_loss : {}, crps : {}, best crps : {}".format(step, d_loss, g_loss, crps,
best_crps))
generator_losses.append(g_loss)
discriminator_losses.append(d_loss)

end_time = time.time() # Record the end time
runtime = end_time - start_time # Calculate the runtime

plot_losses(generator_losses,discriminator_losses, False)
plot_gradiants(generator_gradients, discriminator_gradients, False)

return generator, runtime

Model configuration

You can always find better configuration of course.

device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")
torch.manual_seed(4)
rs = np.random.RandomState(4)

Config = namedtuple('Config', ['epochs',
'pred_len', 'seq_len', 'n_critic',
'crps', 'optimiser', 'lr', 'dropout', 'batch_size',
'noise_size',
'noise_type',
'generator_latent_size',
'discriminator_latent_size'
])

config = Config(
epochs=6500,
pred_len=1,
seq_len=10,
n_critic = 1,
crps=0.5,
optimiser=None,
lr=0.0033,
dropout=0.33,
batch_size = 32,
noise_size = 32,
noise_type = 'normal',
generator_latent_size = 8,
discriminator_latent_size = 64,
)

generator = Generator(hidden_dim=config.generator_latent_size, feature_no=len(df.columns),
seq_len= config.seq_len, output_dim=config.pred_len, dropout=config.dropout).to(device)
discriminator = Discriminator(seq_len=config.seq_len,
hidden_dim=config.discriminator_latent_size).to(device)

optimizer_g = torch.optim.RMSprop(generator.parameters(), lr=config.lr)
optimizer_d = torch.optim.RMSprop(discriminator.parameters(), lr=config.lr)
adversarial_loss = nn.BCELoss()
adversarial_loss = adversarial_loss.to(device)

is_train = True
best_crps = np.inf

Run the training

trained_model, runtime = train(best_crps)
checkpoint = torch.load('checkpoint.pt')
generator.load_state_dict(checkpoint['g_state_dict'])

x_test = torch.tensor(data['X_test'], device=device, dtype=torch.float32)
predictions = []

with torch.no_grad():
generator.eval()
noise_batch = generate_noise(config.noise_size, x_test.size(0), config.noise_type, rs)
predictions.append(generator(x_test, noise_batch, batch_size=1).detach().cpu().numpy().flatten())

predictions = np.stack(predictions).flatten()
y_test = data['y_test'].flatten()
trues = data['y_test'].flatten()
preds = predictions.flatten()

Model Evaluation

Evaluation metrics point-wise error metrics (such as MAE, MSE, RMSE, MAPE, and MAPE) are commonly used for evaluation time series forecasting models. Additionally, we used Kullback-Leibler Divergence (KL Divergence) to measure the distribution similarity between the actual and generated data.

Let’s build some helper functions to plot the training losses, gradiants and the prediction performance. Additionally, evaluation functions.

#Plotting
def plot_losses(gen_losses, critic_losses, path, save = None):
plt.figure(figsize=(10, 5))
plt.plot(gen_losses, label='Generator')
plt.plot(critic_losses, label='Discriminator')
plt.xlabel('Epoch')
plt.ylabel(' Loss')
plt.title('Generator and Critic Loss During Training')
plt.legend()
if save:
plt.savefig(path + "/loss.png")
plt.show()


def plot_gradiants(gen_gradients, critic_gradients, path, save = False):

plt.figure(figsize=(10, 5))
plt.plot(gen_gradients, label='Generator Gradients')
plt.plot(critic_gradients, label='Discriminator Gradients')
plt.xlabel('Epoch')
plt.ylabel('Gradient Magnitude')
plt.title('Gradient Magnitude per Epoch')
plt.legend()
if save:
plt.savefig(path + '/grad.png')
plt.show()

def plot_trues_preds(trues, preds, path=False):
print(trues.shape, preds.shape)
plt.plot(trues)
plt.plot(preds)
plt.title('Actual vs generated data')
plt.legend(['Actual', 'Generated'], loc='upper left')
if path:
plt.savefig(path + '/line.png', bbox_inches='tight')
plt.show()
# Evaluation metric

def MAE(pred, true):
return np.mean(np.abs(pred - true))
def MSE(pred, true):
return np.mean((pred - true) ** 2)
def RMSE(pred, true):
return np.sqrt(MSE(pred, true))
def MAPE(pred, true):
return np.mean(np.abs((pred - true) / true))
def MSPE(pred, true):
return np.mean(np.square((pred - true) / true))

def r_2(preds, trues):
return metrics.r2_score(trues, preds) # R-Squared
from scipy.special import rel_entr
def kl_divergence(preds, trues):

# Ensure that P and Q are probability distributions
preds /= preds.sum()
trues /= trues.sum()

# Compute KL Divergence
kl_div = np.sum(rel_entr(preds, trues))

return kl_div

def metric(trues, preds):
preds = np.round(preds, 2)
trues = np.round(trues, 2)
mae = MAE(preds, trues)
mse = MSE(preds, trues)
rmse = RMSE(preds, trues)
mspe = MSPE(preds, trues)
mape = MAPE(preds, trues)
r2 = r_2(preds, trues)
kl = kl_divergence(preds, trues)

print(" MAE: {:.6f} , MSE {:.6f}, RMSE {:.6f}, MSPE {:.6f}, MAPE {:.6f}, R2 {:.6f}, KL {:.6f}".format(mae, mse ,rmse ,mspe ,mape ,r2,kl))
return {'mae':mae,'mse':mse,'rmse':rmse, 'mspe':mspe, 'mape':mape, 'r2':r2}
#################################################
# Ploting
#################################################

plot_trues_preds(trues, preds, False)
metrics = metric(trues, preds)
metrics['crps'] = best_crps

At the end, You should obtain results that look like the figure below.

Conclusion

In this series, we created a conditional GAN model for Brent oil price forecasting. We started by introducing the model architecture and flow, then moved on to dataset preparation, and finally trained and evaluated the model. It is important to note that fine-tuning is crucial for the model’s performance. In many cases, investing effort in tuning the model can yield better results than developing a more complex model.

You can view the relevant articles from via these links:
Part 1: Introduction
Part2: Dataset preparation
Brent Oil price: exploratory data analysis (EDA)
The full code and dataset (GitHub)

--

--