How 3DFY.ai Built a Multi-Cloud, Distributed Training Platform Over Spot Instances with TorchElastic and Kubernetes

Assaf Pinhasi
PyTorch
Published in
9 min readJun 17, 2021

--

Deep Learning development is becoming more and more about minimizing the time from idea to trained model.

To shorten this lead time, researchers need access to a training environment that supports running multiple experiments concurrently, each utilizing several GPUs.

Until recently, training environments with tens or hundreds of GPUs were the sole property of the largest and richest technology companies. However, recent advances in the open-source community have helped close this gap, making this technology accessible even for small startups.

In this series, we will share our experience in building out a scalable training environment using TorchElastic and Kubernetes, utilizing spot instances and deployed in multiple cloud providers.

Part I of this series (this post) will cover our requirements, main design decisions, as well as quantified benefits to our training speed and cost.

Part II will be a step-by-step guide for building a simplified version of this environment.

Let’s go!

Background and requirements

This section was written in collaboration with Tal Kenig, VP R&D of 3DFY.ai

At 3DFY.ai, we work on challenging geometric Deep Learning problems, most notably 3D reconstructions from very sparse input data in the wild. With time, our model training needs have increased significantly due to a number of reasons.

First, a typical computational pipeline we use contains 7–10 different DL models. In addition, we employ a data-centric approach which practically means we retrain our models often on new datasets. Coupled with the fact our models are relatively large and our datasets ever-growing, this led our R&D efforts to be limited by training throughput.

Our training infrastructure started off like many other startups: with little automation and a lot of painful manual work. Whenever we wanted to run a training job, we would spin up an (on-demand) instance, install CUDA drivers, set up Python virtual environments, pull our code, and kick off the training process by hand, logging into the machine to look at logs and Tensorboard to track progress.

Our typical training job would use a machine with 4–8 GPUs and, given our training dynamics and data size, the training would take days to weeks.

Being fortunate enough to participate in several cloud providers’ incentive programs, we found ourselves repeating this work on several different clouds, and still, our cloud bills were growing month-over-month.

At a certain point, we decided to invest in building a proper training infrastructure, which would:

  • Speed up training by an order of magnitude to allow us to experiment faster
  • Make experimentation easy by providing a simple API for kicking off and monitoring many experiments concurrently
  • Reduce training costs by using cheaper cloud resources
  • Be cloud-provider agnostic to enable us to move between providers
  • Keep a low operational footprint and allow us to focus on research and not DevOps
  • Require little or no changes to our training code which is PyTorch native

Speeding up training by breaking the single machine barrier with PyTorch

Some of our models are relatively large — 170MM parameters is not an uncommon size for us.

Even when using machines with 8 GPUs, our batch size was limited to ~128, which meant completing epochs wasn’t fast enough. We wanted to increase our batch size up to 512 which would require 32–64 GPUs, and practically this meant training a single model on multiple machines.

Luckily, PyTorch provides a very useful module called DistributedDataParallel that enables just that. In a nutshell, DistributedDataParallel (or DDP for short) is a variation on DataParallel, which can run on multiple machines.

DistributedDataParallel training process. image by the author

DDP doesn’t cover concerns like cluster management, or how to start the training processes on different nodes. The worker discovery functionality is also pluggable, and by default, workers get passed the list of IPs explicitly.

Since these aspects need to be covered in order to build a functioning system, we set off looking for open source tools that will cover these requirements.

We evaluated Kubeflow, which is a popular solution for running training jobs on Kubernetes. While Kubeflow supports running DDP jobs using its PyTorchOperator, it is also a complex project with many moving parts, and we wanted to keep our solution slim. We looked at other solutions too, like Ray and Horovod, but they were either too new or not native to PyTorch.

Even more importantly, most of these solutions didn’t address the challenges of running on cheaper compute — i.e. spot instances.

Reducing cloud costs by running on spot instances with TorchElastic

GPU is by far the biggest expense in our monthly cloud bill, and using spot instances instead of on-demand instances offers an average saving of 70%, which is very significant. However, using spots means dealing with node interruptions. As training jobs take hours to days to complete, interruptions are pretty much guaranteed, especially on GCP where instances get interrupted every 24 hours.

We needed an automatic way to recover from interruptions without losing training progress. And here, TorchElastic came to the rescue.

TorchElastic

TorchElastic is a relatively new PyTorch module, which extends DDP and adds support for training elasticity, i.e. making training jobs resilient to workers terminating and joining mid-training, and generally coping with changes in the number of training workers during a single job.

TorchElastic also includes a module for integrating it with Kubernetes. Looking at the code, we were impressed by its lightweight and elegant approach and decided to adopt it in our training infrastructure.

Main concepts

TorchElastic introduces the following main concepts on top of DDP:

  • ElasticAgent — the “control plane” of the distributed training.
    An agent is a “supervisor” process, in charge of spawning, monitoring, coordinating and restarting the (DDP) training worker processes.
    There is one agent per machine, and it spawns a worker per each of the machine’s GPU.
  • Rendezvous — a synchronization barrier and worker discovery protocol. Agent processes use it to discover other agents running on other machines, and determine the rank of each worker; this is achieved by a communication protocol over a service discovery mechanism such as etcd. The list of discovered peers is made available to the DDP worker.
TorchElastic distributed process orchestration. credit: https://pytorch.org/elastic/0.2.0rc1/agent.html

Supporting elasticity and resilience

TorchElastic agents are able to detect and respond to changes in the number of workers during the training job, for example when nodes get terminated mid-training. Typically, after an “elasticity event” such as a worker departing or joining the job, the ElasticAgents will restart the workers.

In order not to lose state, TorchElastic requires a small adaptation in the training code which will ensure it loads the last state from disk when (re-)started; “state” here includes model checkpoints or any other training specific state.

This design decision makes a lot of sense. While we may lose a bit of training progress since the last checkpoint, we benefit from a simple, straightforward and battle-proof solution for state management.

TorchElastic on Kubernetes

The TorchElastic Kubernetes module contains a CRD describing TorchElastic training Jobs, and a Controller that is able to manage these jobs.

Each training worker is deployed as a Kubernetes pod, with a single container. The container’s entry-point starts the TorchElastic agent, which in turn starts the DDP worker process(es).

A TorchElastic training job specifies how many workers it requires — by providing both the desired number and a min/max range. The controller’s job is to maintain the training job’s target size throughout its lifetime, including after interruptions, similar to a ReplicaSet.

Here is an imagenet training deployment from the TorchElastic examples.

TorchElastic Imagenet training job deployment .yaml source: https://github.com/pytorch/elastic/blob/v0.2.2/kubernetes/config/samples/imagenet.yaml

Note these fields which are unique to the TorchElastic CRD:

  • line 7: rdzvEndpoint — to make sure the pod knows where to find the etcd server used for coordination
  • lines 8–9: min/max replicas — the elasticity boundaries for the job.

Elasticity with TorchElastic and Cluster autoscaler

We needed our Kubernetes clusters to automatically scale up and down in response to TorchElastic training jobs, and we achieved this by integrating with a cluster autoscaler.

When jobs are submitted, TorchElastic creates pods for the training workers. The autoscaler’s job is to allocate sufficient compute to satisfy the scheduling requirements from these pods.

When the job completes and pods exit, the autoscaler detects a period of low utilization on the nodes and scales the nodes down.

In the case of a node interruption, the TorchElastic controller creates replacement pods. These pods will again trigger the cluster autoscaler to create (replacement) nodes.

The integration between TorchElastic and the Kubernetes autoscaler means we have a completely elastic infrastructure, where we pay for what we use and are also able to leverage the massive spot discount since we are agnostic to interruptions.

Cluster autoscaler in action — scale-out, handle interruption, scale in

Evaluating training speed

Since we train large networks of about 200MM parameters (~X5 than ResNet-50), distributed training can involve a fair amount of communication between GPUs on different machines. We wanted to make sure the communication overhead doesn’t slow down the training to the point of cancelling out the benefit of running on more GPUs.

Single node setup

On AWS, we set up a 4gdn.12xl, which has 4xT4 GPUs, 48 CPU cores and 192 GB memory.

TorchElastic on Kubernetes setup

We used 4 x g4dn.2xl, each with 8 cores and 32 GB per machine. In total this setup had smaller compute power than the single node.

Results

We discovered that we needed to tweak the number of dataloader processes per GPU when running in the distributed setup, to refrain from data loading and (CPU) preprocessing becoming bottlenecks. Once we did that, we reached comparable results to those of a single node setup.

Benchmarking larger experiments

For larger experiments, we couldn’t easily compare to a single node setup. Instead, we decided to find out what kind of relative speedup factor we could achieve by adding more GPUs, the theoretical ideal being linear scaling.

We measured the speedup factor with respect to a full epoch training time. We tested on two GPU types, scaling the GPUs from 4 to 64, and using each card’s DDP training speed on 4 GPUs as the baseline.

Results

We saw x8–10 speedup by scaling from 4 to 64 GPUs.

Unsurprisingly, adding more GPUs means a sub-linear speedup in training, because, at a certain point, the network communication does become more significant and the NICs on the smaller spot machines can get saturated.

Below is a diagram describing the scaling efficiency factor, by plotting relative speedup as a function of adding more GPUs. The ideal is linear speedup.

image by the author

Cost

For training jobs that use up to 16 GPUs, where we achieved close to linear speedup in training, we save 60–70% of the experiment cost by running on spot instances — close to the full spot savings factor.

For experiments using 32+ GPUs, we save about 40% of the cost since the scaling factor is sub-linear — but benefit from up to 10X speedup, which of course has significant benefits in itself.

Managing TorchElastic jobs from the CLI

We wanted to offer our researchers a simple interface for kicking off training jobs, viewing their status, logs and managing them in general. We created a simple CLI tool that uses the Kubernetes Python Client to communicate with Kubernetes.

Researchers can specify the number and type of GPU for the job, name the experiment and pass any parameters to their training script. The tool then generates a deployment YAML and submits it to the cluster. This makes launching and managing jobs easy.

Conclusion

We set out to build a multi-cloud, distributed training solution over spot instances — an infrastructure that only a couple of years ago would have been impossible to build unless you are a large company with an army of engineers.

Choosing TorchElastic to handle the training orchestration enabled us to have a working solution on both AWS and GCP within a couple of weeks. Having this infrastructure in place resulted in a drastic reduction of our time-to-training results as well as cloud costs. As a small startup, this is a game-changer in our ability to move fast and develop our product.

In part II of this series, we will walk you through the process of creating a basic version of this environment.

Hope you enjoyed this post, and happy training!

--

--