Efficient training of Deep Learning jobs on the cloud with datasets consisting of many small files

Parijat Dube
AI Platforms Research
9 min readNov 20, 2018

Parijat Dube, Falk Pollok

Problem Statement

In this study we address the problem of how to efficiently train deep learning models on machine learning cloud platforms, e.g. IBM Watson Machine Learning, when the training dataset consists of a large number of small files (e.g. JPEG format) and is stored in an object store like IBM Cloud Object Storage (COS). As an example, we train a PyTorch model using the Oxford flowers dataset. Despite of Oxford flowers being a small dataset, it is representative of the problems that one will encounter with large datasets like Imagenet1K.

This work was initiated when the performance of training jobs with datasets of many small files stored on Cloud Object Storage (COS) was reported to be very poor. Both the initial set-up time (before the job starts running and producing logs) and the runtime of job (e.g. per epoch time) was increased. In particular, the problem was observed when running PyTorch models with ImageNet1K stored as single JPEG files on COS. The pytorch model code is available here: https://github.com/pytorch/examples/tree/master/imagenet.

In the following section we first explain how data loading and batching works in PyTorch to then add shared memory and elaborate how the number of workers and amount of available shared memory influence training time and performance. Finally, we migrate the dataset into a single file in the Hierarchical Data Format (HDF5) and remeasure performance impact which shows a >36x improvement. We discuss how this number could be further increased in the conclusion.

Steps in Training

There are two major steps involved in data loading and processing in the training phase. We need to first preprocess the data, then load the dataset and finally iterate over the images in it during training. PyTorch provides many tools to facilitate data pre-processing, loading, and iteration, in particular though the classes torch.utils.data.Dataset and torch.utils.data.Dataloader (see https://pytorch.org/docs/stable/data.html). Dataset is an abstract class which needs to be inherited by a custom Dataset class which, in turn, has to provide the __len__ and __getitem__ methods which need to be overridden:

  • len(dataset) returns the size of the dataset
  • getitem(dataset) supports the indexing such that dataset[i] returns the i-th sample

Torchvision already provides an implementation of Dataset for most popular custom dataset classes, cmp. https://pytorch.org/docs/stable/torchvision/datasets.html

All datasets in torchvision.datasets are subclasses of torch.utils.data.Dataset with preimplemented __getitem__ and __len__methods.

In the PyTorch code with ImageNet the torchvision.datasets class is invoked in line 119:

import torchvision.datasets as datasets
import torchvision.transforms as transforms
train_dataset = datasets.ImageFolder(
traindir,
transforms.Compose([
transforms.RandomResizedCrop(224),
transforms.RandomHorizontalFlip(),
transforms.ToTensor(),
normalize,
])
)

ImageFolder is a sub-class of Dataset provided by torchvision.datasets. There are various transforms that can be applied to the images when loading them in Dataset. The class torchvision.transforms.Compose(transform)takes a list of transform objects as input which shall be applied sequentially on the images in traindir when Dataset is sampled. Above we see four transforms on each image when creating the dataset:

  • RandomResizedCrop(224)
  • RandomHorizontalFlip()
  • ToTensor()
  • normalize After the train_dataset is instantiated, an instance of torch.utils.data.Dataloader is created in line 133:
train_loader = torch.utils.data.DataLoader(
train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None),
num_workers=args.workers, pin_memory=True, sampler=train_sampler
)

The for loop in line 185 iterates over the training dataset.

for i, (input, target) in enumerate(train_loader):
# measure data loading time
data_time.update(time.time() - end)
target = target.cuda(non_blocking=True)# compute output
output = model(input)
loss = criterion(output, target)
# measure accuracy and record loss
prec1, prec5 = accuracy(output, target, topk=(1, 5))
losses.update(loss.item(), input.size(0))
top1.update(prec1[0], input.size(0))
top5.update(prec5[0], input.size(0))
# compute gradient and do SGD step
optimizer.zero_grad()
loss.backward()
optimizer.step()
# measure elapsed time
batch_time.update(time.time() - end)
end = time.time()

The Dataloader class combines a dataset and a sampler. It is an iterator which supports batching, shuffling, and parallel loading of data using multi-processing worker. If shuffle is true, the dataset is reshuffled at the beginning of any training epoch. Then a batch of images is loaded in memory using num_workers subprocesses, with num_workers=0 meaning the data will be loaded in the main process. Before a batch of images is loaded in memory, all the transforms in transforms.Compose() are applied to each image. Each of these transforms adds to the data loading time of the dataset. Furthermore, the time consumed by this step is linear in the number of images (batch size) to be read from COS, transformed, and loaded at each iteration. After the data is loaded there is a forward pass and a backward pass step.

To benchmark the performance of PyTorch on an image dataset, we first run main.py with the Oxford flowers dataset, which has 102 classes with 10 images per class, both for the training and validation set. The default model is resnet18. We instrumented main.py to get time for different stages:

  1. model_loading: Time to load the neural network model into GPU memory.
  2. data_preparation: Time create instances of the Dataset and Dataloaderclasses.
  3. epoch_time: Time to finish one training epoch. At the beginning of each training epoch Python restarts the train_loader iterator by calling train_loader.__iter__() which returns an object with a .next()method. Then Python calls the .next() method of this object in order to get the first and subsequent values used by the for loop. In each iteration, after data is loaded in memory, a forward and backward pass is executed. Thus, time for one iteration (batch_time) includes both data loading and compute time. epoch_time is a sum of batch_time over all the batches in one epoch.
  • data loading: Time to sample images for one iteration using the DataLoader iterator as well as loading them into memory.
  • batch_time: Time to load data for one iteration as well as to compute forward and backward pass.

Performance using dataset with JPEG files

We first quantify the performance of PyTorch with images as JPEG files. Each job is run for 20 epochs in a Kubernetes pod with 1 Nvidia Tesla P100 GPU, 8 CPUs and 24GiB of memory. The dataset is stored in a COS bucket which is locally mounted on the pod. We first tried to run main.py with default parameters using the command:

python main-timing.py --epochs 20 --batch-size 64 /mnt/oxford-flowers

The job crashed after the first iteration with the following error:

ERROR: Unexpected bus error encountered in worker. This might be caused by insufficient shared memory (shm).
Epoch: [0][0/16] Time 4.430 (4.430) Data 3.707 (3.707) Loss 6.9139 (6.9139) Prec@1 0.000 (0.000) Prec@5 0.000 (0.000)
ERROR: Unexpected bus error encountered in worker. This might be caused by insufficient shared memory (shm).
Traceback (most recent call last):
File "main-timing.py", line 333, in <module>
main()
File "main-timing.py", line 166, in main
train(train_loader, model, criterion, optimizer, epoch)
File "main-timing.py", line 205, in train
for i, (input, target) in enumerate(train_loader):
File "/opt/conda/lib/python2.7/site-packages/torch/utils/data/dataloader.py", line 280, in __next__
idx, batch = self._get_batch()
File "/opt/conda/lib/python2.7/site-packages/torch/utils/data/dataloader.py", line 259, in _get_batch
return self.data_queue.get()
File "/opt/conda/lib/python2.7/Queue.py", line 168, in get
self.not_empty.wait()
File "/opt/conda/lib/python2.7/threading.py", line 340, in wait
waiter.acquire()
File "/opt/conda/lib/python2.7/site-packages/torch/utils/data/dataloader.py", line 178, in handler
_error_if_any_worker_fails()
RuntimeError: DataLoader worker (pid 170) is killed by signal: Bus error.

Next, we changed the set num_workers to 0 and ran the following command:

python main_jpg-timing.py --epochs 20 --workers 0 --batch-size 64 /mnt/oxford-flowers

This time the job ran till completion. The instrumented code main_jpg-timing.py is simply main.py with hooks to get time for each step and can be downloaded from: https://github.ibm.com/dl-res/autolauncher/blob/master/main_jpg-timing.py

The table below shows total execution time, average epoch_time, average batch_time, data_loading and average GPU utilization for each run. Also shown are the top-1 and top-5 accuracy after each training epoch. The accuracy is calculated on the validation set which also has 1020 images (102 classes with 10 images each). Note that total_execution_time can be approximated as:

total_execution_time = model_loading + data_preparation + average_epoch_time * number_epochs

So, we can only run 0 workers configuration with this dataset. The average GPU utilization is extremely low (<2%) and the job runtime is 3033.43 sec. The code supports multiprocessing during data loading part when workers > 0. On checking the shared memory of the pod it turned out to be only 64M (run df -h inside the pod). We next increased the shared memory of the pod by adding

spec:
volumes:
- name: shm
emptyDir:
medium: Memory
containers:
- image: pytorch/pytorch:0.4.1-cuda9-cudnn7-devel
volumeMounts:
- mountPath: /dev/shm
name: shm

in the pod deployment YAML (with shared memory mount) and deployed the pod using kubectl create.

Now we can run the job with multiple workers. The table below summarizes the results:

batch size = 64, epochs = 20, number of batches = 16

As we increase the number of worker threads the av_epoch_time gets smaller and hence the job total_exec_time. For example, running with 32 workers, we see a 91% drop (2917.70 to 269.6) in job execution time (compared to running with 0 workers) and 10.5x increase in average GPU utilization (1.46% to 15.30%).

Next we look at average epoch time. Observe that

average_epoch_time = average_train_time + average_test_time
average_train_time = average_train_batch_time * number_train_batches_per_epoch
average_test_time = average_test_batch_time * number_test_batches_per_epoch
train_batch_time = train_data_loading_time + train_compute_time
test_batch_time = test_data_loading_time + test_compute_time

The table below shows the decomposition of average_epoch_time into average_train_time and average_test_time. Both train and test time are equally affected by the increase in workers. While the compute time (train_compute_time and test_compute_time) is not affected by an increase in workers, the data loading time in both train and test phase significantly reduces with more workers.

shared memory = 32G, batch size = 64, epochs = 20, number of batches = 16

Note that these numbers are all from one run for each configuration. For more accurate estimates of time one needs to run several jobs (for each configuration) and average out the time.

Performance using HDF5 dataset

We next converted our JPEG dataset into an HDF5 file. The code to prepare an HDF5 file from JPEG files is here:

with h5py.File('dataset.hdf5', 'w') as hdf5_file:
...
hdf5_dataset = hdf5_file.create_dataset(
name=str(current_name),
data=image,
shape=(image_height, image_width, image_channels),
maxshape=(image_height, image_width, image_channels),
compression="gzip",
compression_opts=9
)

For PyTorch to be able to load data from an HDF5 file a new subclass of the Dataset class, which can return images in HDF5 file as tensors, is needed. The code for this can be found in main-hdf5-timining.py which is available here: https://github.ibm.com/dl-res/autolauncher/blob/master/main_hdf5-timing.py

We first ran with default shared memory settings for 0 workers:

python main_hdf5-timing.py --epochs 20 --workers 0 --batch-size 64 /mnt/oxford-flowers

This time the job ran until completion. The instrumented code main_hdf5-timing.py can be downloaded from:

Next when we try to run with workers > 0, the job again crashed with same insufficient shared memory (shm) error as we got before with the JPEG dataset. Again, we deployed a pod with increased shared memory and ran jobs with multiple workers. The jobs ran fine for small number of workers (0,1). The table below shows the results from these runs.

shared memory = 126G , batch size = 64, epochs = 20, number of batches = 16

Conclusion

The performance of training jobs depends on the storage and the file format. We found that with COS, datasets in a single HDF5 file perform much better (high GPU utilization and reduced runtime) compared to datasets consisting of multiple small JPEG files, e.g., with one worker we achieved a gain of ~28x. The performance should further improve if we can run multiple workers with HDF5 files which we are currently investigating. The goal is to prevent GPU starvation (achieve greater than 95% utilization) by increasing the number of workers. The findings in this study are based on the Oxford flowers dataset but are also applicable to larger image datasets and most likely other modalities as well.

--

--