A quick guide to distributed training with TensorFlow and Horovod on Amazon SageMaker

Learn how distributed training works and how Amazon SageMaker makes it as easy as training on your laptop

Shashank Prasanna
Towards Data Science
11 min readMar 14, 2020

--

Distribute training on multiple GPUs using horovod and Amazon SageMaker for faster training and increased productivity

In deep learning, more is better. More data, more layers, and more compute power, usually leads to higher accuracy, and better robustness of trained models.

I may not be able to help you collect more data, but I can show how you can do distributed training on a large number of machines to train faster and run more experiments and increase your productivity.

In this blog post, I’m going to cover how you can run distributed training, without managing infrastructure — no instances to launch, no clusters to setup, no storage volumes to manage, and no containers to build. Bring in your training scripts, specify the number of GPUs, and let Amazon SageMaker handle the rest.

In the first part of this guide, I’ll provide step-by-step instructions for updating your training scripts to use the Horovod library. For distributed training to work, training processes on different GPUs need to communicate. Horovod enables this seamless communication and provides a convenient API to prepare your training scripts for distribution training. The changes you make are agnostic to the number of GPUs, so it’s a one time effort.

In the second part of this guide I’ll show how you can take your updated training scripts and run them at-scale using Amazon SageMaker on as many GPUs as you want, or as little as you need — just by changing a single line of code.

Want to follow along and run examples as you read? Jupyter notebook, and training scripts are available here:

https://github.com/shashankprasanna/distributed-tensorflow-horovod-sagemaker

Horovod and the ring all-reduce approach

Horovod is a distributed deep learning framework that supports popular deep learning frameworks — TensorFlow, Keras, PyTorch, and Apache MXNet. The example in this guide uses TensorFlow and Keras. If you’re a PyTorch or MXNet user updating your scripts will follow a very similar process as described here. The Horovod documentation page also includes plenty of examples for other frameworks.

During distributed training, multiple processes need to communicate with each other. To enable communication between training processes, Horovod uses a communication protocol called Message Passing Interface (MPI). And to average gradients and update all copies of the models, it uses an approach called ring-allreduce (we’ll come back to this). These approaches are not new and have been used for many years by scientists, researchers and engineers working in High-Performance Computing (HPC) to solve problems in computational fluid dynamics, molecular dynamics, computer graphics and others.

MPI itself defines basic concepts for sending and receiving information between multiple processes in a cluster such as allreduce, allgather, and broadcast. And as you may have deduced from their names — allgather gathers data from all processes (in the case of deep learning, gradients). Broadcast, broadcasts data (gradients) from one process to every other process. allreduce (conceptually speaking) combines these two operations — gathers data from all processes, performs a reduction operation (for example, averaging gradients) and then broadcasts (the averaged gradients).

Illustration of ring allreduce with 6 processes on 3 machines with 2 GPUs each. rank is a global unique ID and local rank is a local unique ID for each GPU

As you increase the number of training processes, inter-process communications increases and communication overhead starts affecting scaling efficiency.

The ring all-reduce approach improves upon vanilla allreduce by making communication cost independent of the number of processes in the system. It does this by arranging processes in a logical ring where each process only receives data from it’s “left” neighbor and sends data to it’s “right” neighbor as illustrated in the accompanying figure.

The ring-allreduce process for deep learning is described in further detail in the Horovod blog post and the Horovod paper. To use the horovod library, you don’t really need to know how ring-allreduce works, but it always helps to have an intuition about how algorithms and libraries you use work.

To use update your training script to use the Horovod library, you primarily need to know the following key concepts:

  • Size: total number of processes/GPUs. This is equal to the product of the number of compute instances in your cluster times the number of GPUs per instance. For example, if you have 2 x p3.16xlarge EC2 instances. Size would be 2 (instances) x 8 (GPUs) = 16.
  • Rank: Unique process ID (size — 1). Each process in a GPU knows it’s unique rank.
  • Local rank: Unique process ID within a machine. For example, the local rank of a GPU in each p3.16xlarge EC2 instance with 8 GPUs would range from 0–7.

What happens during distributed training?

For the purpose of illustration, let’s take an example of a distributed training job on 2 GPUs — these could be on the same of different systems, it doesn’t matter. Here’s what happens behind the scenes:

Illustration of what happens during distributed training with 2 training processes

Step 1: During the forward pass, it’s business as usual. Each copy of the model does a forward pass with a batch_size of data that it receives.

Step 2: A backward pass is then performed to compute the gradients. But the gradient is NOT used to update the weights yet.

Step 3: Horovod now does an allreduce operation (average gradients and then broadcast) to all processes. In this example to both GPUs.

Step 4: The final allreduced gradients are now used to update each model

By allowing each GPU to train on different batches of data, and allreducing the gradients, you’re effectively training on a larger batch and therefore speeding up training.

Updating your training script to use Horovod API

For this demo, I’ll use the CIFAR-10 dataset which consists of 60,000 32x32 images belonging to 10 different classes (6,000 images per class). The training script is available on blog post’s GitHub repository along with Jupyter notebook to run the full example:

https://github.com/shashankprasanna/distributed-tensorflow-horovod-sagemaker

This section describe the changes that were made to the following files to prepare them for distributed training :

To make it easier for you to follow along, I’ve included the exact same section heading as a comment in the above scripts. Look for “Change NUMBER

Look for the exact same section heading as a comment in the above scripts

Change 1: Import horovod and keras backend

Put these at the top of your training script to import horovod.

import horovod.tensorflow.keras as hvd
import tensorflow.keras.backend as K

Change 2: Initialize horovod and get the size of the cluster

Initialize horovod and get the total number of GPUs in your cluster. If you’re only running this on CPUs then this will be equal to the total number of instances.

hvd.init()
size = hvd.size()

Change 3 — Pin GPU to local process (one GPU per process)

Pin a GPU to current process.

config = tf.ConfigProto()
config.gpu_options.allow_growth = True
config.gpu_options.visible_device_list = str(hvd.local_rank())
K.set_session(tf.Session(config=config))

Change 4: Scale the learning using the size of the cluster (total number of workers)

Update the learning rate by scaling it by number of GPUs. The effective batch during distributed training is batch_size times hvd.size(). This change is in model_def.py

By increasing the learning rate, you compensate for the effective increase in batch size.

opt = SGD(lr=lr * size, decay=weight_decay, momentum=momentum)

Change 5: Wrap your Keras optimizer using Horovod to make it a distributed optimizer

The distributed optimizer does the magic of averaging gradients and broadcasting it using allreduce or allgather, and then updating the weights with the averaged gradients. This change is in model_def.py

opt = hvd.DistributedOptimizer(opt)

Change 6: Add callbacks for syncing initial state, and saving checkpoints only on 1st worker (rank 0)

Change 7: Update the number of steps/epoch

You’ll need to divide the total number of images/batch by the number of GPUs.

Change 8: Update script to accept hyperparameters as command line arguments

Amazon SageMaker will pass these values to the script when it launches a distributed training job.

Run distributed training on Amazon SageMaker

You’re now done with the hard part — modifying your training script to make it distributed ready.

The rest of the process — distributed training — is relatively straightforward using Amazon SageMaker.

To run a distributed training job using Amazon SageMaker, download and install the SageMaker Python SDK. For a more convenient experience, you can also launch an Amazon SageMaker notebook instance which comes with Jupyter Notebook server, SageMaker Python SDK and popular deep learning frameworks pre-installed.

Running a SageMaker training job involves only two key steps which I’ll highlight below:

  1. Creating a SageMaker TensorFlow Estimator
  2. Calling the fit() function

The following code excerpts are from the following Jupyter Notebook in the blog post’s repository.

https://github.com/shashankprasanna/distributed-tensorflow-horovod-sagemaker/blob/master/cifar10-sagemaker-distributed.ipynb

Using the SDK, you need to specify the following details so that Amazon SageMaker can get the requested resources and prepare for training

  • Your training script
  • Directory with training script dependencies
  • Location to save trained models
  • Type of CPU or GPU instance you want to training on
  • Number of GPUs per instance
  • TensorFlow version
  • Distribution type — MPI (used by Horovod) or parameter server (an alternative approach to distributed training)

There are many more options you can specify in the SageMaker TensorFlow estimator and you can a full list in the documentation: https://sagemaker.readthedocs.io/en/stable/index.html

The implementation is as follows:

After defining the estimator, you’ll need to specify the paths to your training, validation and test datasets are in Amazon S3, and pass it to the estimator’s fit function.

And you’re done! Sit back and wait for the distributed training job to complete.

You can (and should) monitor progress, which I’ll cover in the next section, but first, let’s take a closer look at what’s happening behind the scenes.

Illustration of the Amazon SageMaker workflow

Amazon SageMaker does several things for you automatically so that you don’t have to worry about infrastructure level details. Very briefly, SageMaker will:

  1. Pick up your training script and dependencies
  2. Provision the specified number of instances in a fully managed cluster
  3. Pull the specified TensorFlow container image
  4. Instantiate containers on every instance.
  5. Download the training code into the instance and make it available in the container
  6. Copy training dataset from Amazon S3 and make it available in the container
  7. Initiate training using MPI

When training is initiated, Amazon SageMaker runs the exact same copy of your Horovod-updated training scripts on each instance. Each copy knows it’s unique local rank using hvd.local_rank() and a GPU is pinned to that particular process. Horovod then takes care of performing ring-allreduce and updating the weights on each GPU with averaged gradients.

Illustration showing every GPU running the exact same copy of the training script. Each training process is uniquely identified by it’s rank

One training is done, SageMaker will automatically:

  • Upload training artifacts such as trained models, checkpoints, tensorboard logs etc. to Amazon S3 bucket you specify
  • Tear down the training cluster so you don’t incur additional costs

Monitoring training training progress

There are a couple of different options for monitoring jobs:

  • Amazon SageMaker Console
  • TensorBoard

Amazon SageMaker Console

If you head over to AWS Console > Amazon SageMaker > Training Jobs you can see a list of currently running jobs and jobs you’ve run in the past. Click on a job and you can see details such as progress status, type of instance, hyperparameters, S3 location for datasets and model artifacts and so on.

Screenshot showing training job on the Amazon SageMaker console

Scroll further down and you can see CPU, GPU and other resource utilizations.

Screenshot showing job monitoring on the Amazon SageMaker console

You’ll also find a link to Amazon CloudWatch dashboard where you can monitor training job logs for all instances. This comes in handy for debugging and diagnosing when something isn’t looking right.

Screenshot showing training job logs on Amazon CloudWatch

TensorBoard

In the training script you’ll notice that there are two Keras callbacks for logging. First for saving tensorboard log files locally in the container and second for syncing those logs to an Amazon S3 location you specified when invoking the SageMaker estimator function.

callbacks.append(TensorBoard(log_dir=logdir))
callbacks.append(Sync2S3(logdir=logdir, s3logdir=tensorboard_logs))

You can now run TensorBoard anywhere you like (your laptop, desktop, EC2 instance) and point it to your Amazon S3 location with your TensorBoard logs. You’ll need to make sure that you have permission to access Amazon S3 and you can set that up using the AWS CLI.

Run the following command on a machine that has tensorboard installed and has S3 read access permissions:

S3_REGION=us-west-2 tensorboard — logdir s3://{bucket_name}/tensorboard_logs/
Live monitoring of distributed training progress of multiple models. Logs are saved in Amazon S3 which allows you to run tensorboard and monitor progress on any machine with access to your S3 bucket.

What’s not to love about saving time?

Distributed training can save you a time when dealing with large models and datasets. With libraries like Horovod and services like Amazon SageMaker, you can scale training with very little effort. In this blog post, I covered two key concepts:

  1. How to take your existing training scripts and update it with Horovod to make it distributed ready.
  2. How to take your horovod updated training scripts and run distributed training using Amazon SageMaker with having to setup and manage clusters

I’ll leave you with a few guidelines for choosing the right instances for distributed training:

For better performance, always favor a single instance with multiple GPUs vs. multiple instances with a single GPU.

On AWS you can get access to instance with 1 GPU (p3.2xlarge), 4 GPUs (p3.8xlarge) and 8 GPUs (p3.16xlarge and p3dn.24xlarge).

Let’s say you want to run distributed training with 4 GPUs, always prefer a single p3.8xlarge instance rather than 4 x p3.2x large. The benefit of doing this is that when processes need to communicate to do allreduce operation, they are not crossing network barriers to communicate with other instance’s CPUs and GPUs. This will add communication latency that may affect training performance. Similarly, if you want to distribute training to 8 GPUs, choose a single p3.16xlarge or p3dn.24xlarge with 8 GPUs vs. 8 x p3.2xlarge or 2 x p3.8xlarge. These multi-GPU instances include NVIDIA’s NVLink technology that enables high-bandwidth inter-GPU communication to speed up the allreduce operations performed by Horovod.

Thanks for reading, I hope you enjoyed this guide. All the code and examples are available on GitHub here:

https://github.com/shashankprasanna/distributed-tensorflow-horovod-sagemaker

If you have questions about this guide, suggestions on how to improve it or ideas for new guides, please reach out to me on twitter (@shshnkp), LinkedIn or leave a comment below. Enjoy!

--

--

Towards Data Science
Towards Data Science

Published in Towards Data Science

Your home for data science and AI. The world’s leading publication for data science, data analytics, data engineering, machine learning, and artificial intelligence professionals.

Shashank Prasanna
Shashank Prasanna

Written by Shashank Prasanna

Talking Engineer. Runner. Coffee Connoisseur. AI/ML @ Apple. Formerly Modular, Meta, AWS, NVIDIA, posts are my own opinions. Visit: shashankprasanna.com

Responses (2)