Building RNN, LSTM, and GRU from Scratch

Ebad Sayed
9 min readJun 26, 2024

--

https://arxiv.org/pdf/1808.10511

In my previous article, we explored the theoretical foundations of RNNs, LSTMs, and GRUs. Now, we move from theory to practice, focusing on the hands-on implementation and training of these powerful models using PyTorch.

Previous Article :- Mastering RNN, LSTM and GRU

We will try to build and train these models on sentiment analysis task. Using the IMDB dataset from keras. The IMDB dataset is a popular dataset used for sentiment analysis tasks in NLP. It consists of movie reviews labeled as positive or negative based on the sentiment expressed in the review text. Researchers and developers often use this dataset to train and evaluate machine learning models, particularly for tasks related to sentiment classification and text analysis.

Download the IMDB Dataset

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from tensorflow.keras.datasets import imdb
from tensorflow.keras.preprocessing import sequence
import numpy as np

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
# Load the IMDB dataset
max_features = 5000
max_len = 500

(x_train, y_train), (x_test, y_test) = imdb.load_data(num_words=max_features)

max_features: defines the maximum number of words to consider as features. Only the top max_features most frequent words will be kept.

max_len: specifies the maximum length of each sequence (or text) after padding or truncation.

imdb.load_data(): This function loads the IMDB dataset from Keras. The num_words parameter ensures that only the top max_features most frequent words are kept in the dataset. Each review in the dataset is represented as a sequence of integers, where each integer corresponds to a specific word in the review.

We then split this dataset into train and test sets.

# Pad sequences to ensure uniform length
x_train = sequence.pad_sequences(x_train, maxlen=max_len)
x_test = sequence.pad_sequences(x_test, maxlen=max_len)

# Convert to PyTorch tensors
x_train = torch.tensor(x_train, dtype=torch.long)
y_train = torch.tensor(y_train, dtype=torch.float32)
x_test = torch.tensor(x_test, dtype=torch.long)
y_test = torch.tensor(y_test, dtype=torch.float32)

sequence.pad_sequences() function is used to pad (or truncate) sequences to ensure they all have the same length (max_len).
This step is necessary because neural networks in PyTorch require inputs of uniform length. Sequences shorter than max_len are padded with zeros at the beginning, and sequences longer than max_len are truncated.

The sequences x_train and x_test and labels y_train and y_test are converted from NumPy arrays to PyTorch tensors.
Long datatype is used for x_train and x_test because they contain integer sequences (word indices). And Float32 datatype is used for y_train and y_test because they contain float values (0.0 for negative sentiment, 1.0 for positive sentiment).

# Create DataLoader
batch_size = 64

train_data = TensorDataset(x_train, y_train)
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)

test_data = TensorDataset(x_test, y_test)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)

TensorDataset: This class is used to create a dataset from tensors (x_train, y_train) and (x_test, y_test), respectively.
DataLoader: This class is used to create batches of data from a dataset (train_data and test_data). It helps in efficiently iterating over batches during training and evaluation.
batch_size: This parameter specifies the number of samples in each batch.
shuffle=True for train_loader ensures that the data is shuffled randomly during training to improve model generalization.
shuffle=False for test_loader ensures that the data is not shuffled during testing for consistent evaluation.

RNN Model

class RNNModel(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, num_layers):
super(RNNModel, self).__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.rnn = nn.RNN(embedding_dim, hidden_dim, num_layers=num_layers, batch_first=True)
self.fc = nn.Linear(hidden_dim, output_dim)
self.sigmoid = nn.Sigmoid()

def forward(self, x):
x = self.embedding(x)
x, _ = self.rnn(x)
x = x[:, -1, :]
x = self.fc(x)
x = self.sigmoid(x)
return x

# Model hyperparameters
vocab_size = max_features
embedding_dim = 128
hidden_dim = 128
output_dim = 1
num_layers = 3

# Initialize the model, criterion and optimizer
rnn_model = RNNModel(vocab_size, embedding_dim, hidden_dim, output_dim, num_layers).to(device)
criterion = nn.BCELoss()
optimizer = optim.Adam(rnn_model.parameters(), lr=0.001)

RNNModel is a subclass of nn.Module, which is the base class for all neural network modules in PyTorch.

Layers

self.embedding is the embedding layer that converts word indices into dense vectors of embedding_dim dimensions.
self.rnn is the RNN layer that processes input sequences. It takes embeddings of shape (batch_size, seq_len, embedding_dim) and outputs a tuple (output, h_n), where output is of shape (batch_size, seq_len, hidden_dim) and h_n is the hidden state for the last time step.
self.fc is the fully connected layer (linear transformation) that maps the RNN output from hidden_dim to output_dim.
self.sigmoid is the sigmoid activation function applied to the output to obtain probabilities (for binary classification).

Hyperparameters

vocab_size, embedding_dim, hidden_dim, output_dim, num_layers are hyperparameters that define the architecture and behavior of the RNN model.
vocab_size assigned from max_features, which defines the size of the vocabulary based on the IMDB dataset.
embedding_dim is the dimensionality of the word embeddings, set to 128.
hidden_dim is the size of the hidden state of the RNN, also set to 128.
output_dim is the dimensionality of the output, which is 1 for binary classification (positive or negative sentiment).
num_layers are the number of RNN layers stacked on top of each other, set to 3.

Initialization

rnn_model creates an instance of RNNModel with the specified hyperparameters and moves it to the device (either CPU or GPU).
criterion defines the loss function for binary cross-entropy (BCELoss), suitable for binary classification tasks.
optimizer uses the Adam optimizer to update the model parameters (rnn_model.parameters()) during training, with a learning rate (lr) of 0.001.

# Training loop
num_epochs = 5

for epoch in range(num_epochs):
rnn_model.train()
total_loss = 0
for inputs, targets in train_loader:
inputs, targets = inputs.to(device), targets.to(device)
optimizer.zero_grad()
outputs = rnn_model(inputs)
loss = criterion(outputs.squeeze(), targets)
loss.backward()
optimizer.step()
total_loss += loss.item()

avg_loss = total_loss / len(train_loader)
print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}')

num_epochs is the number of times the entire dataset will be passed forward and backward through the neural network during training and it is set to 5.
Then we iterate over batches of data loaded from train_loder. Then we move the inputs and the labels to the device.
optimizer.zero_grad() clears the gradients of all optimized torch.Tensor parameters before performing backpropagation.
We then perform the forward pass of the input data through the model to obtain predicted outputs. And then we calculate the loss between the predicted outputs and the actual labels.
loss.backward() computes the gradient of the loss with respect to the model parameters.
optimizer.step() updates the model parameters based on the computed gradients.
Then we accumulates the current batch loss loss.item() to total_loss.
Finally we calcuate the average loss across all batches in the current epoch as total_loss / len(train_loader).

# Evaluate the model
rnn_model.eval()
correct = 0
total = 0
with torch.no_grad():
for inputs, targets in test_loader:
inputs, targets = inputs.to(device), targets.to(device)
outputs = rnn_model(inputs)
predicted = (outputs.squeeze() >= 0.5).float()
total += targets.size(0)
correct += (predicted == targets).sum().item()

accuracy = correct / total
print(f'Test Accuracy: {accuracy:.4f}')

rnn_model.eval() sets the model to evaluation mode. This is important to ensure that layers like dropout or batch normalization behave correctly during evaluation.
correct and total are counters for correctly predicted samples and for total samples respectively.
with torch.no_grad() context manager to disable gradient calculation, which reduces memory consumption and speeds up computation.
Again it follows the same procedure but this time with test set, forward pass and output prediction.
predicted = (outputs.squeeze() >= 0.5).float() converts model outputs to binary predictions (>= 0.5 threshold) and casts them to float.
Then we calculate the accuray as correct / total which is the number of correctly predicted samples divided by total number of samples.

LSTM Model

class LSTMModel(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, num_layers):
super(LSTMModel, self).__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=num_layers, batch_first=True)
self.fc = nn.Linear(hidden_dim, output_dim)
self.sigmoid = nn.Sigmoid()

def forward(self, x):
x = self.embedding(x)
x, _ = self.lstm(x)
x = x[:, -1, :]
x = self.fc(x)
x = self.sigmoid(x)
return x

# Model hyperparameters
vocab_size = max_features
embedding_dim = 128
hidden_dim = 128
output_dim = 1
num_layers = 3

# Initialize the model, criterion and optimizer
lstm_model = LSTMModel(vocab_size, embedding_dim, hidden_dim, output_dim, num_layers).to(device)
criterion = nn.BCELoss()
optimizer = optim.Adam(lstm_model.parameters(), lr=0.001)

self.lstm: nn.LSTM() defines the LSTM layer. It takes the embedded inputs (embedding_dim) and processes them through num_layers LSTM layers. batch_first=True indicates that the input and output tensors are provided as (batch_size, seq_length, features). Rest all layers are similar to the previous RNN model.

x, _ = self.lstm(x) passes the embedded tensor x through the LSTM layers. The output x is a tensor of shape (batch_size, seq_length, hidden_dim), and _ contains the final hidden state and cell state of the LSTM, which we don’t use in this case.
x = x[:, -1, :] extracts the output of the last time step from the LSTM output tensor x. This step captures the final representation of the input sequence.

# Training loop
num_epochs = 5

for epoch in range(num_epochs):
lstm_model.train()
total_loss = 0
for inputs, targets in train_loader:
inputs, targets = inputs.to(device), targets.to(device)
optimizer.zero_grad()
outputs = lstm_model(inputs)
loss = criterion(outputs.squeeze(), targets)
loss.backward()
optimizer.step()
total_loss += loss.item()

avg_loss = total_loss / len(train_loader)
print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}')
# Evaluate the model
lstm_model.eval()
correct = 0
total = 0
with torch.no_grad():
for inputs, targets in test_loader:
inputs, targets = inputs.to(device), targets.to(device)
outputs = lstm_model(inputs)
predicted = (outputs.squeeze() >= 0.5).float()
total += targets.size(0)
correct += (predicted == targets).sum().item()

accuracy = correct / total
print(f'Test Accuracy: {accuracy:.4f}')

GRU Model

class GRUModel(nn.Module):
def __init__(self, vocab_size, embedding_dim, hidden_dim, output_dim, num_layers):
super(GRUModel, self).__init__()
self.embedding = nn.Embedding(vocab_size, embedding_dim)
self.gru = nn.GRU(embedding_dim, hidden_dim, num_layers=num_layers, batch_first=True)
self.fc = nn.Linear(hidden_dim, output_dim)
self.sigmoid = nn.Sigmoid()

def forward(self, x):
x = self.embedding(x)
x, _ = self.gru(x)
x = x[:, -1, :]
x = self.fc(x)
x = self.sigmoid(x)
return x

# Model hyperparameters
vocab_size = max_features
embedding_dim = 128
hidden_dim = 128
output_dim = 1
num_layers = 3

# Initialize the model, criterion and optimizer
gru_model = GRUModel(vocab_size, embedding_dim, hidden_dim, output_dim, num_layers).to(device)
criterion = nn.BCELoss()
optimizer = optim.Adam(gru_model.parameters(), lr=0.001)
# Training loop
num_epochs = 5

for epoch in range(num_epochs):
gru_model.train()
total_loss = 0
for inputs, targets in train_loader:
inputs, targets = inputs.to(device), targets.to(device)
optimizer.zero_grad()
outputs = gru_model(inputs)
loss = criterion(outputs.squeeze(), targets)
loss.backward()
optimizer.step()
total_loss += loss.item()

avg_loss = total_loss / len(train_loader)
print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {avg_loss:.4f}')
# Evaluate the model
gru_model.eval()
correct = 0
total = 0
with torch.no_grad():
for inputs, targets in test_loader:
inputs, targets = inputs.to(device), targets.to(device)
outputs = gru_model(inputs)
predicted = (outputs.squeeze() >= 0.5).float()
total += targets.size(0)
correct += (predicted == targets).sum().item()

accuracy = correct / total
print(f'Test Accuracy: {accuracy:.4f}')

Visualize the Networks

!pip install torchviz
from torchviz import make_dot

torchviz is a Python library that provides utilities for visualizing PyTorch computational graphs. It allows you to create visual representations of neural network models built with PyTorch, which can help in understanding the flow of data and operations within the model.

# Dummpy input
example_input = torch.randint(0, vocab_size, (1, max_len)).to(device)

# Models output
rnn_o = rnn_model(example_input)
lstm_o = lstm_model(example_input)
gru_o = gru_model(example_input)

# Create visualization
rnn_viz = make_dot(rnn_o, params=dict(rnn_model.named_parameters()))
lstm_viz = make_dot(lstm_o, params=dict(lstm_model.named_parameters()))
gru_viz = make_dot(gru_o, params=dict(gru_model.named_parameters()))

# Save to file
rnn_viz.render('visualization/rnn_model', format='png')
lstm_viz.render('visualization/lstm_model', format='png')
gru_viz.render('visualization/gru_model', format='png')

example_input creates a dummy input tensor using torch.randint. It generates random integers between 0 and vocab_size in a tensor of shape (1, max_len).
rnn_o, lstm_o, gru_o stores the outputs of the RNN, LSTM, and GRU models, respectively, when given example_input. Each model (rnn_model, lstm_model, gru_model) processes example_input to produce output tensors (rnn_o, lstm_o, gru_o).
make_dot function generates a graph visualization of a PyTorch computation graph.
params=dict(model.named_parameters()) retrieves the model’s named parameters and their values, which are used to annotate the graph with parameter names and values.
.render method saves the generated graph visualization to a file as specified format.

Below are the images of RNN, LSTM and GRU models that we built.

Image by Author
Image by Author
Image by Author

All the codes are available on my GitHub.

I have also implemented these three models to predict the BTC price by training them on data from the last 8 years, spanning from May 2016 to May 2024. You can view that project here.

--

--

Ebad Sayed

I am currently a final year undergraduate at IIT Dhanbad, looking to help out aspiring AI/ML enthusiasts with easy AI/ML guides.