Setting up multi GPU processing in PyTorch

Kaustav Mandal
exemplifyML.ai
Published in
4 min readMay 25, 2022
Photo by Caspar Camille Rubin on Unsplash

In this tutorial, we will see how to leverage multiple GPUs in a distributed manner on a single machine.

We will be using the Distributed Data-Parallel feature of pytorch.

Libraries Used: python 3.8, pytorch 11, CUDA 11.4

We chose to use DistributedDataParallel instead of the DataParallel, as the DDP is based on using multi processes instead of DP which uses multi-threading, and has GIL contention issues.

Gradient sync — multi GPU training (Image by Author)

Each GPU will replicate the model and will be assigned a subset of data samples, based on the number of GPUs available.

For example, for a data set of 100, and 4 GPUs, each GPU will process a dataset of 25 per iteration.

The synchronization on the DDP happens on the constructor, forward pass and the backward pass. Additionally on the backward pass, the average of the gradients are propagated to each GPU.

For additional synchronization details, please view Writing Distributed Applications with PyTorch.

Forking Processes:

For forking multiple processes we are using the torch multiprocessing framework.

Once the processes are spawned, the 1st argument is the index of the process typically called the rank.

In the example below, all the spawned processes which call the method say ‘train’, will have rank values ranging from 0 to 3. We can use this to identify the individual processes and use the rank = 0 as the base process.

import torch.multiprocessing as mp// number of GPUs equal to number of processesworld_size = torch.cuda.device_count()mp.spawn(<selfcontainedmethodforeachproc>, nprocs=world_size, args=(args,))

GPU Process Assignment:

Assign the GPU to each of the processes spawned for training.

import torch
import torch.distributed as dist

def train(self, rank, args):

current_gpu_index = rank
torch.cuda.set_device(current_gpu_index)

dist.init_process_group(
backend='nccl', world_size=args.world_size,
rank=current_gpu_index,
init_method='env://'
)

Multi Process Dataloader Setup:

For processing images, we will use the standard ImageFolder dataloader , which requires sample data in the following format.

<basedir>/testset/<categoryname>/<listofimages>
<basedir>/valset/<categoryname>/<listofimages>
<basedir>/trainset/<categoryname>/<listofimages>

Setup the data loader:

from torchvision.datasets import ImageFoldertrain_dataset = ImageFolder(root=os.path.join(<basedir>, "trainset"), transform=train_transform)

Setup the DistributedSample, which when used in conjunction with the DDP, provides a subset to each process/GPU.

from torch.utils.data import DistributedSamplerdist_train_samples = DistributedSampler(dataset=train_dataset, num_replicas =4, rank=rank, seed=17)

Assign the DistributedSampler to the DataLoader

from torch.utils.data import DataLoadertrain_loader = DataLoader(
train_dataset,
batch_size=self.BATCH_SIZE,
num_workers=4,
sampler=dist_train_samples,
pin_memory=True,
)

Multi Process Model Initialization:

Initialize the data model, and assign it to each GPU.

from torch.nn.parallel import DistributedDataParallel as DDP
from torchvision import models as models
model = models.resnet34(pretrained=True)
loss_fn = nn.CrossEntropyLoss()
model.cuda(current_gpu_index)
model = DDP(model)

loss_fn.cuda(current_gpu_index)
optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.module.parameters()), lr=1e-3)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=7)

Once we start training, we need to set the epoch on the distributed sampler so that data is shuffled across epochs, or the same ordering will be used in every epoch.

for epoch in range(1, self.EPOCHS+1):


dist_train_samples.set_epoch(epoch)

For each batch in the dataloader, pass the inputs to the GPU and compute the gradients.

for cur_iter_data in (loaders["train"]):inputs, labels = cur_iter_data
inputs, labels = inputs.cuda(current_gpu_index, non_blocking=True), labels.cuda(current_gpu_index, non_blocking=True)
//training phase
optimizer.zero_grad(set_to_none=True)
with torch.set_grad_enabled(phase == 'train'):
outputs = model(inputs)
_, preds = torch.max(outputs, 1)
loss = loss_fn(outputs, labels)
loss.backward()
optimizer.step()
// compute running loss//after each iteration of the epoch
scheduler.step()

Compare the current epoch accuracy against the previous one and store the the weights if better.

# inside the epoch loopif rank % args.n_gpus == 0 :torch.save(model.module.state_dict(), os.path.join(os.getcwd(), "scripts/model", args.model_file_name))

At the end of training, we would have the model weights saved in a ‘pth’ file which we can load for inference on a CPU or GPU.

Load Model From File:

load_path = os.path.join(os.getcwd(), "scripts/model", args.model_file_name)# ImageClassifier is a extension of the ResNet class, as we had changed the FC layer for our purposes.
model_image_classifier = ImageClassifier()
model_image_classifier.load_state_dict(
torch.load(load_path), strict=False
)
model_image_classifier.cuda(current_gpu_index)
model_image_classifier = DDP(model_image_classifier)
model_image_classifier = model_image_classifier.eval()

This should cover the basics for leveraging the multiprocessing capability of PyTorch for reducing model training time.

Other Articles in this series:

References:

  1. Pytorch Documentation
  2. Kaiming He, Xiangyu Zhang, Shaoqing Ren, Jian Sun. (2015). Deep Residual Learning for Image Recognition
  3. Saining Xie, Ross Girshick, Piotr Dollár, Zhuowen Tu, Kaiming He. (2016). Aggregated Residual Transformations for Deep Neural Networks

--

--

Kaustav Mandal
exemplifyML.ai

Software Engineer with an interest in Machine Learning / Data science , ML Ops