A quick guide to distributed training with TensorFlow and Horovod on Amazon SageMaker
Learn how distributed training works and how Amazon SageMaker makes it as easy as training on your laptop
horovod
and Amazon SageMaker for faster training and increased productivityIn deep learning, more is better. More data, more layers, and more compute power, usually leads to higher accuracy, and better robustness of trained models.
I may not be able to help you collect more data, but I can show how you can do distributed training on a large number of machines to train faster and run more experiments and increase your productivity.
In this blog post, I’m going to cover how you can run distributed training, without managing infrastructure — no instances to launch, no clusters to setup, no storage volumes to manage, and no containers to build. Bring in your training scripts, specify the number of GPUs, and let Amazon SageMaker handle the rest.
In the first part of this guide, I’ll provide step-by-step instructions for updating your training scripts to use the Horovod library. For distributed training to work, training processes on different GPUs need to communicate. Horovod enables this seamless communication and provides a convenient API to prepare your training scripts for distribution training. The changes you make are agnostic to the number of GPUs, so it’s a one time effort.
In the second part of this guide I’ll show how you can take your updated training scripts and run them at-scale using Amazon SageMaker on as many GPUs as you want, or as little as you need — just by changing a single line of code.
Want to follow along and run examples as you read? Jupyter notebook, and training scripts are available here:
https://github.com/shashankprasanna/distributed-tensorflow-horovod-sagemaker
Horovod and the ring all-reduce approach
Horovod is a distributed deep learning framework that supports popular deep learning frameworks — TensorFlow, Keras, PyTorch, and Apache MXNet. The example in this guide uses TensorFlow and Keras. If you’re a PyTorch or MXNet user updating your scripts will follow a very similar process as described here. The Horovod documentation page also includes plenty of examples for other frameworks.
During distributed training, multiple processes need to communicate with each other. To enable communication between training processes, Horovod uses a communication protocol called Message Passing Interface (MPI). And to average gradients and update all copies of the models, it uses an approach called ring-allreduce (we’ll come back to this). These approaches are not new and have been used for many years by scientists, researchers and engineers working in High-Performance Computing (HPC) to solve problems in computational fluid dynamics, molecular dynamics, computer graphics and others.
MPI itself defines basic concepts for sending and receiving information between multiple processes in a cluster such as allreduce
, allgather
, and broadcast
. And as you may have deduced from their names — allgather
gathers data from all processes (in the case of deep learning, gradients). Broadcast, broadcasts data (gradients) from one process to every other process. allreduce
(conceptually speaking) combines these two operations — gathers data from all processes, performs a reduction operation (for example, averaging gradients) and then broadcasts (the averaged gradients).
As you increase the number of training processes, inter-process communications increases and communication overhead starts affecting scaling efficiency.
The ring all-reduce
approach improves upon vanilla allreduce
by making communication cost independent of the number of processes in the system. It does this by arranging processes in a logical ring where each process only receives data from it’s “left” neighbor and sends data to it’s “right” neighbor as illustrated in the accompanying figure.
The ring-allreduce
process for deep learning is described in further detail in the Horovod blog post and the Horovod paper. To use the horovod library, you don’t really need to know how ring-allreduce
works, but it always helps to have an intuition about how algorithms and libraries you use work.
To use update your training script to use the Horovod library, you primarily need to know the following key concepts:
- Size: total number of processes/GPUs. This is equal to the product of the number of compute instances in your cluster times the number of GPUs per instance. For example, if you have 2 x
p3.16xlarge
EC2 instances. Size would be 2 (instances) x 8 (GPUs) = 16. - Rank: Unique process ID (size — 1). Each process in a GPU knows it’s unique rank.
- Local rank: Unique process ID within a machine. For example, the local rank of a GPU in each
p3.16xlarge
EC2 instance with 8 GPUs would range from 0–7.
What happens during distributed training?
For the purpose of illustration, let’s take an example of a distributed training job on 2 GPUs — these could be on the same of different systems, it doesn’t matter. Here’s what happens behind the scenes:
Step 1: During the forward pass, it’s business as usual. Each copy of the model does a forward pass with a batch_size of data that it receives.
Step 2: A backward pass is then performed to compute the gradients. But the gradient is NOT used to update the weights yet.
Step 3: Horovod now does an allreduce
operation (average gradients and then broadcast) to all processes. In this example to both GPUs.
Step 4: The final allreduced
gradients are now used to update each model
By allowing each GPU to train on different batches of data, and allreducing the gradients, you’re effectively training on a larger batch and therefore speeding up training.
Updating your training script to use Horovod API
For this demo, I’ll use the CIFAR-10 dataset which consists of 60,000 32x32 images belonging to 10 different classes (6,000 images per class). The training script is available on blog post’s GitHub repository along with Jupyter notebook to run the full example:
https://github.com/shashankprasanna/distributed-tensorflow-horovod-sagemaker
This section describe the changes that were made to the following files to prepare them for distributed training :
cifar10-tf-horovod-sagemaker.py
: The main training scriptmodel_def.py
: The model architecture definition script
To make it easier for you to follow along, I’ve included the exact same section heading as a comment in the above scripts. Look for “Change NUMBER
”
Change 1: Import horovod and keras backend
Put these at the top of your training script to import horovod.
import horovod.tensorflow.keras as hvd
import tensorflow.keras.backend as K
Change 2: Initialize horovod and get the size of the cluster
Initialize horovod
and get the total number of GPUs in your cluster. If you’re only running this on CPUs then this will be equal to the total number of instances.
hvd.init()
size = hvd.size()
Change 3 — Pin GPU to local process (one GPU per process)
Pin a GPU to current process.
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
config.gpu_options.visible_device_list = str(hvd.local_rank())
K.set_session(tf.Session(config=config))
Change 4: Scale the learning using the size of the cluster (total number of workers)
Update the learning rate by scaling it by number of GPUs. The effective batch during distributed training is batch_size times hvd.size()
. This change is in model_def.py
By increasing the learning rate, you compensate for the effective increase in batch size.
opt = SGD(lr=lr * size, decay=weight_decay, momentum=momentum)
Change 5: Wrap your Keras optimizer using Horovod to make it a distributed optimizer
The distributed optimizer does the magic of averaging gradients and broadcasting it using allreduce
or allgather
, and then updating the weights with the averaged gradients. This change is in model_def.py
opt = hvd.DistributedOptimizer(opt)
Change 6: Add callbacks for syncing initial state, and saving checkpoints only on 1st worker (rank 0)
Change 7: Update the number of steps/epoch
You’ll need to divide the total number of images/batch by the number of GPUs.
Change 8: Update script to accept hyperparameters as command line arguments
Amazon SageMaker will pass these values to the script when it launches a distributed training job.
Run distributed training on Amazon SageMaker
You’re now done with the hard part — modifying your training script to make it distributed ready.
The rest of the process — distributed training — is relatively straightforward using Amazon SageMaker.
To run a distributed training job using Amazon SageMaker, download and install the SageMaker Python SDK. For a more convenient experience, you can also launch an Amazon SageMaker notebook instance which comes with Jupyter Notebook server, SageMaker Python SDK and popular deep learning frameworks pre-installed.
Running a SageMaker training job involves only two key steps which I’ll highlight below:
- Creating a SageMaker
TensorFlow
Estimator - Calling the
fit()
function
The following code excerpts are from the following Jupyter Notebook in the blog post’s repository.
Using the SDK, you need to specify the following details so that Amazon SageMaker can get the requested resources and prepare for training
- Your training script
- Directory with training script dependencies
- Location to save trained models
- Type of CPU or GPU instance you want to training on
- Number of GPUs per instance
- TensorFlow version
- Distribution type — MPI (used by Horovod) or parameter server (an alternative approach to distributed training)
There are many more options you can specify in the SageMaker TensorFlow estimator and you can a full list in the documentation: https://sagemaker.readthedocs.io/en/stable/index.html
The implementation is as follows:
After defining the estimator, you’ll need to specify the paths to your training, validation and test datasets are in Amazon S3, and pass it to the estimator’s fit function.
And you’re done! Sit back and wait for the distributed training job to complete.
You can (and should) monitor progress, which I’ll cover in the next section, but first, let’s take a closer look at what’s happening behind the scenes.
Amazon SageMaker does several things for you automatically so that you don’t have to worry about infrastructure level details. Very briefly, SageMaker will:
- Pick up your training script and dependencies
- Provision the specified number of instances in a fully managed cluster
- Pull the specified TensorFlow container image
- Instantiate containers on every instance.
- Download the training code into the instance and make it available in the container
- Copy training dataset from Amazon S3 and make it available in the container
- Initiate training using MPI
When training is initiated, Amazon SageMaker runs the exact same copy of your Horovod-updated training scripts on each instance. Each copy knows it’s unique local rank using hvd.local_rank() and a GPU is pinned to that particular process. Horovod then takes care of performing ring-allreduce
and updating the weights on each GPU with averaged gradients.
One training is done, SageMaker will automatically:
- Upload training artifacts such as trained models, checkpoints, tensorboard logs etc. to Amazon S3 bucket you specify
- Tear down the training cluster so you don’t incur additional costs
Monitoring training training progress
There are a couple of different options for monitoring jobs:
- Amazon SageMaker Console
- TensorBoard
Amazon SageMaker Console
If you head over to AWS Console > Amazon SageMaker > Training Jobs you can see a list of currently running jobs and jobs you’ve run in the past. Click on a job and you can see details such as progress status, type of instance, hyperparameters, S3 location for datasets and model artifacts and so on.
Scroll further down and you can see CPU, GPU and other resource utilizations.
You’ll also find a link to Amazon CloudWatch dashboard where you can monitor training job logs for all instances. This comes in handy for debugging and diagnosing when something isn’t looking right.
TensorBoard
In the training script you’ll notice that there are two Keras callbacks for logging. First for saving tensorboard log files locally in the container and second for syncing those logs to an Amazon S3 location you specified when invoking the SageMaker estimator function.
callbacks.append(TensorBoard(log_dir=logdir))
callbacks.append(Sync2S3(logdir=logdir, s3logdir=tensorboard_logs))
You can now run TensorBoard anywhere you like (your laptop, desktop, EC2 instance) and point it to your Amazon S3 location with your TensorBoard logs. You’ll need to make sure that you have permission to access Amazon S3 and you can set that up using the AWS CLI.
Run the following command on a machine that has tensorboard installed and has S3 read access permissions:
S3_REGION=us-west-2 tensorboard — logdir s3://{bucket_name}/tensorboard_logs/
What’s not to love about saving time?
Distributed training can save you a time when dealing with large models and datasets. With libraries like Horovod and services like Amazon SageMaker, you can scale training with very little effort. In this blog post, I covered two key concepts:
- How to take your existing training scripts and update it with Horovod to make it distributed ready.
- How to take your horovod updated training scripts and run distributed training using Amazon SageMaker with having to setup and manage clusters
I’ll leave you with a few guidelines for choosing the right instances for distributed training:
For better performance, always favor a single instance with multiple GPUs vs. multiple instances with a single GPU.
On AWS you can get access to instance with 1 GPU (p3.2xlarge
), 4 GPUs (p3.8xlarge
) and 8 GPUs (p3.16xlarge
and p3dn.24xlarge
).
Let’s say you want to run distributed training with 4 GPUs, always prefer a single p3.8xlarge
instance rather than 4 x p3.2x large
. The benefit of doing this is that when processes need to communicate to do allreduce operation, they are not crossing network barriers to communicate with other instance’s CPUs and GPUs. This will add communication latency that may affect training performance. Similarly, if you want to distribute training to 8 GPUs, choose a single p3.16xlarge
or p3dn.24xlarge
with 8 GPUs vs. 8 x p3.2xlarge
or 2 x p3.8xlarge
. These multi-GPU instances include NVIDIA’s NVLink technology that enables high-bandwidth inter-GPU communication to speed up the allreduce operations performed by Horovod.
Thanks for reading, I hope you enjoyed this guide. All the code and examples are available on GitHub here:
https://github.com/shashankprasanna/distributed-tensorflow-horovod-sagemaker
If you have questions about this guide, suggestions on how to improve it or ideas for new guides, please reach out to me on twitter (@shshnkp), LinkedIn or leave a comment below. Enjoy!