We are going to use PYTorch and create RNN model step by step. Then we will train the model with MNIST training data and evaluate the model with test data.
Import libraries
import torch
Check available device
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device
Output: device(type=’cpu’)
Download MNIST dataset
What is MNIST dataset?
The MNIST database (Modified National Institute of Standards and Technology database) is a large database of handwritten digits that is commonly used for training various image processing systems. The database is also widely used for training and testing in the field of machine learning.
The MNIST database contains 60,000 training images and 10,000 testing images.
PyTorch domain libraries provide a number of pre-loaded datasets (such as FashionMNIST, MNIST etc…) that subclass torch.utils.data.Dataset and implement functions specific to the particular data. They can be used to prototype and benchmark your model. In this example we are using MNIST dataset.
Download MNIST dataset in local system
from torchvision import datasets
from torchvision.transforms import ToTensortrain_data = datasets.MNIST(
root = 'data',
train = True,
transform = ToTensor(),
download = True,
)test_data = datasets.MNIST(
root = 'data',
train = False,
transform = ToTensor()
)
Print train_data and test_data size
print(train_data)
Output:
print(test_data)
Output:
print(train_data.data.size())
Output: torch.Size([60000, 28, 28])
print(train_data.targets.size())
Output: torch.Size([60000])
print(train_data.data[0])
Output:
tensor([[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
… …..
[ 0, 0, 0, 0, 0, 0, 0, 0, 30, 36, 94, 154, 170, 253,
253, 253, 253, 253, 225, 172, 253, 242, 195, 64, 0, 0, 0, 0],
[ 0, 0, 0, 0, 0, 0, 0, 49, 238, 253, 253, 253, 253, 253,
253, 253, 253, 251, 93, 82, 82, 56, 39, 0, 0, 0, 0, 0],
[ 0, 0, 0, 0, 0, 0, 0, 18, 219, 253, 253, 253, 253, 253,
198, 182, 247, 241, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[ 0, 0, 0, 0, 0, 0, 0, 0, 80, 156, 107, 253, 253, 205,
11, 0, 43, 154, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 14, 1, 154, 253, 90,
…….
[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
45, 186, 253, 253, 150, 27, 0, 0, 0, 0, 0, 0, 0, 0],
[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 16, 93, 252, 253, 187, 0, 0, 0, 0, 0, 0, 0, 0],
[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 249, 253, 249, 64, 0, 0, 0, 0, 0, 0, 0],
[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
46, 130, 183, 253, 253, 207, 2, 0, 0, 0, 0, 0, 0, 0],
[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 39, 148,
229, 253, 253, 253, 250, 182, 0, 0, 0, 0, 0, 0, 0, 0],
[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 24, 114, 221, 253,
…….
[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[ 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]],
dtype=torch.uint8)
Visualization of MNIST dataset
Plot one train_data
import matplotlib.pyplot as pltplt.imshow(train_data.data[0], cmap='gray')
plt.title('%i' % train_data.targets[0])
plt.show()
Output:
Plot multiple train_data
figure = plt.figure(figsize=(10, 8))
cols, rows = 5, 5
for i in range(1, cols * rows + 1):
sample_idx = torch.randint(len(train_data), size=(1,)).item()
img, label = train_data[sample_idx]
figure.add_subplot(rows, cols, i)
plt.title(label)
plt.axis("off")
plt.imshow(img.squeeze(), cmap="gray")
plt.show()
Output:
Preparing data for training with DataLoaders
The Dataset retrieves our dataset’s features and labels one sample at a time. While training a model, we typically want to pass samples in “minibatches”, reshuffle the data at every epoch to reduce model overfitting, and use Python’s multiprocessing to speed up data retrieval.
DataLoader is an iterable that abstracts this complexity for us in an easy API.
from torch.utils.data import DataLoaderloaders = {
'train' : torch.utils.data.DataLoader(train_data,
batch_size=100,
shuffle=True,
num_workers=1),
'test' : torch.utils.data.DataLoader(test_data,
batch_size=100,
shuffle=True,
num_workers=1),
}
loaders
Output:
Define the Recurrent Neural Network model
class torch.nn.RNN(args, *kwargs)
Applies a multi-layer Elman RNN with tanh or ReLU non-linearity to an input sequence.
Parameters:
input_size — The number of expected features in the input x
hidden_size — The number of features in the hidden state h
num_layers — Number of recurrent layers. E.g., setting num_layers=2 would mean stacking two RNNs together to form a stacked RNN, with the second RNN taking in outputs of the first RNN and computing the final results. Default: 1
nonlinearity — The non-linearity to use. Can be either ‘tanh’ or ‘relu’. Default: ‘tanh’
bias — If False, then the layer does not use bias weights b_ih and b_hh. Default: True
batch_first — If True, then the input and output tensors are provided as (batch, seq, feature). Default: False
dropout — If non-zero, introduces a Dropout layer on the outputs of each RNN layer except the last layer, with dropout probability equal to dropout. Default: 0
bidirectional — If True, becomes a bidirectional RNN. Default: False
from torch import nn
import torch.nn.functional as F
Declare hyper-parameters
sequence_length = 28
input_size = 28
hidden_size = 128
num_layers = 2
num_classes = 10
batch_size = 100
num_epochs = 2
learning_rate = 0.01
Create a class
Step 1: Create a class
Create a class called RNN and we have to add PyTorch’s base class(nn.module) for all neural network modules.
class RNN(nn.Module): passmodel = RNN().to(device)
print(model)
Step 2: Add constructor in RNN class
We are passing the input dimension, hidden dimension, number of layers and num of classes as input parameters.
Input dimension — represents the size of the input at each time step
Hidden dimension — represents the size of the hidden state and cell state at each time step
Number of layers — the number of LSTM layers stacked on top of each other
Num of classes — require an output layer with 10 nodes in order to predict the probability distribution of an image belonging to each of the 10 classes.
class RNN(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, num_classes):
super(RNN, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, num_classes)
passpassmodel = RNN(input_size, hidden_size, num_layers, num_classes).to(device).to(device)
print(model)
Output: RNN(
(lstm): LSTM(28, 128, num_layers=2, batch_first=True)
(fc): Linear(in_features=128, out_features=10, bias=True)
)
Step 3: Add forward method to the class
We have to define the forward() method inside the class. The forward function is executed sequentially, therefore we’ll have to pass the inputs and the zero-initialized hidden state through the RNN layer first, before passing the RNN outputs to the fully-connected layer. We are using the layers which we have defined in the constructor.
class RNN(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, num_classes):
super(RNN, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, num_classes)
pass
def forward(self, x):
passpassmodel = RNN(input_size, hidden_size, num_layers, num_classes).to(device).to(device)
print(model)
Output: RNN(
(lstm): LSTM(28, 128, num_layers=2, batch_first=True)
(fc): Linear(in_features=128, out_features=10, bias=True)
)
Step 4: Set initial hidden state and cell state
We have to initialize a hidden state and cell state for the LSTM as this is the first cell. The hidden state and cell state is stored in a tuple with the format (hidden_state, cell_state).
class RNN(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, num_classes):
super(RNN, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, num_classes)
pass
def forward(self, x):
# Set initial hidden and cell states
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
passpassmodel = RNN(input_size, hidden_size, num_layers, num_classes).to(device)
print(model)
Step 5: Pass the input and hidden state into the model
class RNN(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, num_classes):
super(RNN, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.fc = nn.Linear(hidden_size, num_classes)
pass
def forward(self, x):
# Set initial hidden and cell states
h0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
c0 = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device) # Passing in the input and hidden state into the model and obtaining outputs
out, hidden = self.lstm(x, (h0, c0)) # out: tensor of shape (batch_size, seq_length, hidden_size)
#Reshaping the outputs such that it can be fit into the fully connected layer
out = self.fc(out[:, -1, :])
return out
passpassmodel = RNN(input_size, hidden_size, num_layers, num_classes).to(device)
print(model)
Output: RNN(
(lstm): LSTM(28, 128, num_layers=2, batch_first=True)
(fc): Linear(in_features=128, out_features=10, bias=True)
)
Define loss function
loss_func = nn.CrossEntropyLoss()
loss_func
Output: CrossEntropyLoss()
Define a Optimization Function
lr(Learning Rate): Rate at which our model updates the weights in the cells each time back-propagation is done.
from torch import optimoptimizer = optim.Adam(model.parameters(), lr = 0.01)
optimizer
Output:
Train the model
Step 1: Create a function called train() and num of epochs, model and data loaders
num_epochs: Number of times our model will go through the entire training dataset
def train(num_epochs, model, loaders):
print(f"num_epochs: {num_epochs}")
print(f"model: {model}")
print(f"loaders['train']: {loaders['train']}")
passtrain(num_epochs, model, loaders)
Output:
Step 2: Iterate num of epochs
def train(num_epochs, model, loaders):
print(f"num_epochs: {num_epochs}")
print(f"model: {model}")
print(f"loaders['train']: {loaders['train']}")
print("Started epoch: ")
for epoch in range(num_epochs):
print("epoch: ", epoch)
pass
print("Ended epoch: ")
passtrain(num_epochs, model, loaders)
Output:
Step 3: Iterate training data loader inside num of epochs
def train(num_epochs, model, loaders):
print(f"num_epochs: {num_epochs}")
print(f"model: {model}")
print(f"loaders['train']: {loaders['train']}")print("Started epoch: ")
for epoch in range(num_epochs):
for i, (images, labels) in enumerate(loaders['train']):
print("epoch: ", epoch)
print(f"images: {images}")
print(f"labels: {labels}")
pass
pass
print("Ended epoch: ")
passtrain(num_epochs, model, loaders)
Step 4: Reshape images
def train(num_epochs, model, loaders):
print(f"num_epochs: {num_epochs}")
print(f"model: {model}")
print(f"loaders['train']: {loaders['train']}")print("Started epoch: ")
for epoch in range(num_epochs):
for i, (images, labels) in enumerate(loaders['train']):
print("epoch: ", epoch)
print(f"images: {images}")
print(f"labels: {labels}")
images = images.reshape(-1, sequence_length, input_size).to(device)
labels = labels.to(device)
print(f"images after reshpae: {images}") pass
pass
print("Ended epoch: ")
passtrain(num_epochs, model, loaders)
Step 4: Pass images to the model
def train(num_epochs, model, loaders):
print(f"num_epochs: {num_epochs}")
print(f"model: {model}")
print(f"loaders['train']: {loaders['train']}")print("Started epoch: ")
for epoch in range(num_epochs):
for i, (images, labels) in enumerate(loaders['train']):
print("epoch: ", epoch)
print(f"images: {images}")
print(f"labels: {labels}")
images = images.reshape(-1, sequence_length, input_size).to(device)
labels = labels.to(device)
print(f"images after reshpae: {images}")
# Forward pass
outputs = model(images) pass
pass
print("Ended epoch: ")
passtrain(num_epochs, model, loaders)
Step 5: Pass outputs to the loss function
def train(num_epochs, model, loaders):
print(f"num_epochs: {num_epochs}")
print(f"model: {model}")
print(f"loaders['train']: {loaders['train']}")print("Started epoch: ")
for epoch in range(num_epochs):
for i, (images, labels) in enumerate(loaders['train']):
print("epoch: ", epoch)
print(f"images: {images}")
print(f"labels: {labels}")
images = images.reshape(-1, sequence_length, input_size).to(device)
labels = labels.to(device)
print(f"images after reshpae: {images}")
# Forward pass
outputs = model(images)
loss = loss_func(outputs, labels) pass
pass
print("Ended epoch: ")
passtrain(num_epochs, model, loaders)
Step 6: Clears existing gradients from previous epoch, backpropagation (compute gradients ) and apply gradients
In PyTorch, we need to set the gradients to zero before starting to do backpropragation because PyTorch accumulates the gradients on subsequent backward passes. This is convenient while training RNNs. So, the default action is to accumulate (i.e. sum) the gradients on every loss.backward() call.
Because of this, when you start your training loop, ideally you should zero out the gradients so that you do the parameter update correctly.
def train(num_epochs, model, loaders):
print(f"num_epochs: {num_epochs}")
print(f"model: {model}")
print(f"loaders['train']: {loaders['train']}")print("Started epoch: ")
for epoch in range(num_epochs):
for i, (images, labels) in enumerate(loaders['train']):
print("epoch: ", epoch)
print(f"images: {images}")
print(f"labels: {labels}")
images = images.reshape(-1, sequence_length, input_size).to(device)
labels = labels.to(device)
print(f"images after reshpae: {images}")
# Forward pass
outputs = model(images)
loss = loss_func(outputs, labels) # Backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
pass
pass
print("Ended epoch: ")
passtrain(num_epochs, model, loaders)
Step 7: Print epoches, batches and losses
def train(num_epochs, model, loaders):
# Train the model
total_step = len(loaders['train'])
print(f"num_epochs: {num_epochs}")
print(f"model: {model}")
print(f"loaders['train']: {loaders['train']}")print("Started epoch: ")
for epoch in range(num_epochs):
for i, (images, labels) in enumerate(loaders['train']):
print("epoch: ", epoch)
print(f"images: {images}")
print(f"labels: {labels}")
images = images.reshape(-1, sequence_length, input_size).to(device)
labels = labels.to(device)
print(f"images after reshpae: {images}")
# Forward pass
outputs = model(images)
loss = loss_func(outputs, labels) # Backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
if (i+1) % 100 == 0:
print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'
.format(epoch + 1, num_epochs, i + 1, total_step, loss.item())) pass
pass
print("Ended epoch: ")
passtrain(num_epochs, model, loaders)
Step 8: Final code of train model
def train(num_epochs, model, loaders):
# Train the model
total_step = len(loaders['train'])
for epoch in range(num_epochs):
for i, (images, labels) in enumerate(loaders['train']):
images = images.reshape(-1, sequence_length, input_size).to(device)
labels = labels.to(device)
# Forward pass
outputs = model(images)
loss = loss_func(outputs, labels) # Backward and optimize
optimizer.zero_grad()
loss.backward()
optimizer.step()
if (i+1) % 100 == 0:
print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'
.format(epoch + 1, num_epochs, i + 1, total_step, loss.item())) pass
pass
passtrain(num_epochs, model, loaders)
Output:
Evaluate the model on test data
We must call model.eval() to set dropout and batch normalization layers to evaluation mode before running inference.
model.train() tells your model that you are training the model. So effectively layers like dropout, batchnorm etc. which behave different on the train and test procedures know what is going on and hence can behave accordingly.
You can call either model.eval() or model.train(mode=False) to tell that you are testing the model.
# Test the model
model.eval()with torch.no_grad():
correct = 0
total = 0
for images, labels in loaders['test']:
images = images.reshape(-1, sequence_length, input_size).to(device)
labels = labels.to(device)
outputs = model(images)
_, predicted = torch.max(outputs.data, 1)
total = total + labels.size(0)
correct = correct + (predicted == labels).sum().item()print('Test Accuracy of the model on the 10000 test images: {} %'.format(100 * correct / total))
Output: Test Accuracy of the model on the 10000 test images: 96.68 %
Print 10 predictions from test data
sample = next(iter(loaders['test']))imgs, lbls = sample
…
test_output = model(imgs[:10].view(-1, 28, 28))
predicted = torch.max(test_output, 1)[1].data.numpy().squeeze()
labels = lbls[:10].numpy()
print(f"Predicted number: {predicted}")
print(f"Actual number: {labels}")
Output: