Reduce Time and Cost by Running Distributed Elastic PyTorch Jobs on Kubernetes

Elena Neroslavskaya
PyTorch
Published in
9 min readAug 13, 2021
TorchElastic Scaling

Modern deep learning models are getting larger and more complex. Latest State-of- the-art NLP models have billions of parameters and training them could take days and even weeks on one machine. Distributing your deep learning model training on high performance computing instances can significantly reduce training time. PyTorch as a model deep learning framework supports DistributedDataParallel approach that implements distributed training using data parallelism which can run across multiple machines.

Graph of model complexity — number of parameters
ML Models growing complexity

And that is where Kubernetes — a popular tool to orchestrate clusters of powerful GPU instances — comes in. Kubernetes provides simple way to manage the compute and distribute ML training jobs across the nodes. In this article we will look at elastic ML training utilizing Azure Kubernetes Service (AKS) — fully managed Kubernetes service provided on Azure which removes the complexity of managing infrastructure and allows developer and data scientists to focus on ML models. Similar offerings are available on GCP and Amazon.

Running ML training jobs on multiple powerful GPU-enabled nodes while significantly decreasing training time could be rather costly. One way to reduce costs is to run the jobs on Azure Spot VMs that allow you to take advantage of the cloud’s unused capacity at a substantial cost savings; however, at any point in time when Azure needs the capacity back, Spot VMs could be evicted. Typical distributed training jobs are not fault tolerant, and a job cannot continue if a node fails or is reclaimed.

Why TorchElastic?

These limitations are addressed by TorchElastic, an open source library that has become part of PyTorch Core in the recent PyTorch 1.9 release. TorchElastic is runner and coordinator for distributed PyTorch training jobs that can gracefully handle scaling events, without disrupting the model training process. Typical use cases:

  • Fault Tolerance: jobs that run on infrastructure where nodes get replaced frequently, either due to flaky hardware or by design. Or mission critical production grade jobs that need to be run with resilience to failures.
  • Dynamic Capacity Management: jobs that run on leased capacity that can be taken away at any time (e.g., Azure VM spot instances) or shared pools where the pool size can change dynamically based on demand.

Moreover, TorchElastic is a native PyTorch library (it is now part of torch-1.9) and as such it does not require you to install and configure any additional frameworks (like Horovod, Dask or Ray). As part of the PyTorch Core framework, it is supported by the PyTorch Enterprise Program on Azure making it a production ready option.

To run elastic jobs only minimal code changes are required to support state management, as we will show in our example. You could also see details in the popular frameworks deepspeech.pytorch and pytorch-lightning that has adopted TorchElastic.

To add to the benefits, TorchElastic is developed with Kubernetes first strategy and comes with TorchElastic Kubernetes Controller which gives users an easy way to define distributed training jobs using Kubernetes native constructs and utilize strengths of the popular orchestrator.

Architecture for TorchElastic on AKS

Let’s take look how it all comes together. TorchElastic for Kubernetes consists of two main components — TorchElastic Kubernetes Controller that manages and monitors training jobs requests and Rendezvous component (built on ETCD key-value store) that is helping to organize training job workers and agree on list of participants, their roles, as well as making a collective decision on when training can begin/resume.

For the optimal setup, to be available for the whole duration of training main TorchElastic components need to be running on non-pre-emptive CPU nodes, while training workers will be scheduled on the GPU Spot nodes to save costs. Training data and job states are saved and loaded from mounted cloud storage.

In our example it translates to the architecture as shown below, with following Azure Kubernetes Services (AKS) cluster setup:

- GPU enabled Spot VM nodepool for running elastic training on preemptable nodes.

- CPU VM nodepool for running Controller and Rendezvous server — TorchElastic control plane

Azure Blob CSI driver mounting Azure Storage Account for hosting training data and model checkpoints.

TorchElastic Architecture on AKS
TorchElastic Setup on AKS

TorchElastic AKS Self-Paced Lab

To demonstrate the setup we have prepared a step by step walkthrough TorchElastic Lab that has Jupyter notebooks with scripts to install and setup AKS and all the required components, example with typical training script enabled to save/restore state and finally we simulate eviction for one of the nodes and scaling it back up to see the fault tolerance and elasticity in action.

  • Step 0: Environment Setup
  • Step 1: Infrastructure Setup (AKS + Spot VM Nodepool) and Torch Elastic
  • Step 2: Adjust script for Elastic training
  • Step 3: Run Torch Elastic ImageNet training on Spot VM Pool
  • Step 4: Simulate node eviction and verify training is not interrupted.

Training Script

Typically, when one of the distributed training participants is interrupted training code needs to re-initialize the “state”. TorchElastic helps abstract away the state management and makes it easy for developers. Follow the guidelines to implement save/resume checkpoints and TorchElastic will handle interruptions seamlessly and will resume the training from saved checkpoints. TorchElastic will handle the distribution of ranks across workers, communication between the nodes and GPU resources on the nodes. For more details and example based on Imagenet we use in our walkthrough refer to TorchElastic Github

Running A Training Job

Once training script is prepared, we rely on Kubernetes to orchestrate the training job. We submit ElasticJob configuration — Kubernetes’ custom resource definition (CRD). TorchElastic Controller watches for this type of resource being submitted to the cluster, creates training pods, passes required arguments to the PyTorch Elastic training launcher and ensures the job matches the desired state described in CRD.

Here is our ElasticJob definition for running on AKS and using CSI driver mounted Persistent Volume:

Note the following details:

Line 9-10: minReplicas and maxReplicas directs controller on number of desired workers and when changed controller scales up and down workers correspondingly without any job interruption. The more workers you have the less data to process by each worker and training completes faster

Line 19–25: to make sure training pods could be scheduled on Azure Spot instances set nodeselector and toleration for the taint “kubernetes.azure.com/scalesetpriority:spot”

Line 50: to utilize nodes with GPU, set resources requests/limits to nvidia.com/gpu: 1.

Line 8: point rdzvEndpoint to Kubernetes service for deployed ETCD service. For details refer to Running TorchElastic Job

Line 27 -30, 51–53: mount Azure Blob storage with CSI Blob driver using PersistentVolume (yml details here).

Line 47: save/load checkpoints to the mounted drive.

Submit defined job to the AKS cluster and monitor the logs

kubectl apply -f elasticjob.yml

Once ElasticJob is created we could see how controller schedules the worker pods and headless services for pod to pod communication:

kubectl describe elasticjob -n elastic-job
Status:
Conditions:
Message: ElasticJob imagenet is running.
Reason: ElasticJobRunning
Status: True
Type: Running
Replica Statuses:
Worker:
Events:
Type Reason Age From Message
---- ------ ---- ---- -------
Normal SuccessfulCreatePod < > elastic-job-controller Created pod: imagenet-worker-0
Normal SuccessfulCreatePod < > elastic-job-controller Created pod: imagenet-worker-1
Normal SuccessfulCreatePod < > elastic-job-controller Created pod: imagenet-worker-2
Normal SuccessfulCreateService < > elastic-job-controller Created service: imagenet-worker-0
Normal SuccessfulCreateService < > elastic-job-controller Created service: imagenet-worker-1
Normal SuccessfulCreateService < > elastic-job-controller Created service: imagenet-worker-2

Getting logs from worker pods shows how TorchElastic controller performs peer discovery and rank assignment and takes care of Rendezvous communication :

kubectl logs imagenet-worker-0 -n elastic-jobINFO] 2021-05-25 19:42:09,129 launch: Running torchelastic.distributed.launch with args: ['/opt/conda/lib/python3.7/site-packages/torchelastic/distributed/launch.py', '--rdzv_backend=etcd', '--rdzv_endpoint=etcd-service:2379', '--rdzv_id=imagenet', '--nnodes=1:3', '--nproc_per_node=1', '/workspace/examples/imagenet/main.py', '--arch=resnet18', '--epochs=3', '--batch-size=64', '--workers=0', '/workspace/data/tiny-imagenet-200', '--checkpoint-file=/mnt/blob/data/checkpoint.pth.tar']
INFO 2021-05-25 19:42:09,139 Etcd machines: ['http://0.0.0.0:2379']
INFO 2021-05-25 19:42:09,149 launch: Using nproc_per_node=1.
INFO 2021-05-25 19:42:09,879 Attempting to join next rendezvous
INFO 2021-05-25 19:42:09,971 Joined rendezvous version 3 as rank 0. Full state: {'status': 'joinable', 'version': '3', 'participants': [0]}
INFO 2021-05-25 19:42:10,348 Rank 0 finished join last call.
INFO 2021-05-25 19:42:10,348 Waiting for remaining peers.
INFO 2021-05-25 19:42:10,349 All peers arrived. Confirming membership.
INFO 2021-05-25 19:42:10,427 Rendezvous version 3 is complete. Final state: {'status': 'final', 'version': '3', 'participants': [0, 1, 2], 'keep_alives': ['/torchelastic/p2p/run_imagenet/rdzv/v_3/rank_2', '/torchelastic/p2p/run_imagenet/rdzv/v_3/rank_0', '/torchelastic/p2p/run_imagenet/rdzv/v_3/rank_1'], 'num_workers_waiting': 0}
[INFO] 2021-05-25 19:42:10,439 api: [default] Rendezvous complete for workers.
Result:
restart_count=0
group_rank=0
group_world_size=3
rank stride=1
assigned global_ranks=[0]
master_addr=imagenet-worker-0
[INFO] 2021-05-25 19:42:10,439 api: [default] Starting worker group
=> set cuda device = 0
=> creating model: resnet18
=> no workers have checkpoints, starting from epoch 0
=> start_epoch: 0, best_acc1: 0
Epoch: [0][ 0/521] Time 5.256 ( 5.256) Data 0.136 ( 0.136) Loss 6.9013e+00 (6.9013e+00) Acc@1 3.12 ( 3.12) Acc@5 4.69 ( 4.69)
Epoch: [0][ 10/521] Time 2.405 ( 2.542) Data 0.099 ( 0.106) Loss 5.4396e+00 (6.1555e+00) Acc@1 0.00 ( 0.43) Acc@5 3.12 ( 3.55)

Once all peers were discovered distributed training have started.

Verifying Fault-tolerance and Elasticity

To see the uninterrupted training in action, simulate eviction event using Azure REST API as we demonstrated in the following script SimulateStop.ipynb

Once node is evicted, worker pods on the removed node are deleted, TorchElastic will readjust the training and re-run peer discovery for active workers and will resume training from saved checkpoint. Similarly, when we scale Kubernetes nodepool back up, TorchElastic controller will schedule more worker pods to match desired state, training job will be adjusted, and participants will join new Rendezvous session.

Notice restart_count, group_rank for this worker and process epoch in the logs:

kubectl logs imagenet-worker-1 -n elastic-jobINFO 2021-05-24 05:27:26,377 Attempting to join next rendezvous
INFO 2021-05-24 05:27:26,380 Observed existing rendezvous state: {'status': 'joinable', 'version': '10', 'participants': [0]}
INFO 2021-05-24 05:27:26,403 Joined rendezvous version 10 as rank 1. Full state: {'status': 'joinable', 'version': '10', 'participants': [0, 1]}
INFO 2021-05-24 05:27:26,462 All peers arrived. Confirming membership.
INFO 2021-05-24 05:27:26,545 Rendezvous version 4 is complete. Final state: {'status': 'final', 'version': '4', 'participants': [0, 1, 2], 'keep_alives': ['/torchelastic/p2p/run_imagenet/rdzv/v_4/rank_1', '/torchelastic/p2p/run_imagenet/rdzv/v_4/rank_2', '/torchelastic/p2p/run_imagenet/rdzv/v_4/rank_0'], 'num_workers_waiting': 0}
INFO 2021-05-24 05:27:26,545 Creating EtcdStore as the c10d::Store implementation
[INFO] 2021-05-24 05:27:26,568 api: [default] Rendezvous complete for workers.
Result:
restart_count=0
group_rank=1
group_world_size=3
rank stride=1
assigned global_ranks=[1]
master_addr=imagenet-worker-1
master_port=53663
[INFO] 2021-05-24 05:27:26,568 api: [default] Starting worker group=> set cuda device = 0
=> creating model: resnet18
=> loading checkpoint file: /mnt/blob/data/checkpoint.pth.tar
=> loaded checkpoint file: /mnt/blob/data/checkpoint.pth.tar
=> using checkpoint from rank: 2, max_epoch: 0
=> checkpoint broadcast size is: 93588276
=> done broadcasting checkpoint
=> done restoring from previous checkpoint
=> start_epoch: 1, best_acc1: 0.6399999856948853
Epoch: [1][ 0/521] Time 4.900 ( 4.900) Data 0.122 ( 0.122) Loss 4.3889e+00 (4.3889e+00) Acc@1 10.94 ( 10.94) Acc@5 18.75 ( 18.75 )
Epoch: [1][ 10/521] Time 2.104 ( 2.290) Data 0.154 ( 0.117) Loss 4.8242e+00 (4.4380e+00) Acc@1 7.81 ( 9.80) Acc@5 17.19 ( 24.43)

As we have seen from the logs, although one of the nodes in the cluster was evicted, controller has rescheduled the workers, reassigned the remaining training data and resumed the training job from saved by one of the workers checkpoint.

For more details on setup refer to the scripts and information in TorchElastic on AKS Walkthrough

Conclusion

We have demonstrated how Kubernetes and TorchElastic library (that has become part of PyTorch 1.9 Core and supported by PyTorch Enterprise Program on Azure) enables Data Scientists to run distributed training jobs in a fault tolerant and elastic manner. Whether nodes are evicted or fail or auto-scaled based on demand, TorchElastic maintains training job reliability, which makes it perfect option for running distributed training on preemptible instances and for production workloads. This approach helps to save time and money for training large scale models.

--

--

Elena Neroslavskaya
PyTorch
Writer for

Cloud Solution Architect and Technology Enthusiast