Cutting the Time of Building Neural Models through Distributed Training

Samuel Knight
Net AI Insights
Published in
7 min readApr 12, 2023

One must experience being locked in a trance watching the hypnotically slow updates of a progress bar to truly know the pain of long-running compute jobs. Luckily for most of us in the machine learning space, there is a simple way to reduce this headache: distributed training on multiple GPUs. While incurring a higher capital expenditure, due to the extra hardware required, faster experiment iteration can translate to higher flexibility and productivity for any ML team. At Net AI, we use distributed training workflows to train cutting-edge traffic decomposition and forecasting models, which mobile network operators can use to improve the management of resources in their infrastructure.

This technical blog focuses on migrating from single GPU training to synchronous distributed training on a single machine with multiple GPUs using TensorFlow’s MirroredStrategy. MirroredStrategy replicates model-related variables (parameters, optimizer, loss function, etc.) across all GPUs. TensorFlow also supports asynchronous distributed training via ParameterServerStrategy and multi-worker distributed training via MultiWorkerMirroredStrategy for other use cases.

💡 A best-practice training workflow should include a number of additional steps (data exploration, model validation, early-stopping, hyper-parameter tuning, etc.) that are skipped in this tutorial for brevity.

Dataset and Model Setup:

Following the toy example laid out in the TensorFlow documentation, we will be training a CNN to classify five types of flowers. The ResNet architecture was chosen for our CNN, as it is quick to set up and sufficiently complex to demonstrate performance benefits with multiple GPUs. The training server we are using is equipped with four NVIDIA V100 GPUs.

First, we create an object classification dataset using TensorFlow’s dataset utilities:

dataset_url = "https://storage.googleapis.com/download.tensorflow.org/example_images/flower_photos.tgz"
data_dir = tf.keras.utils.get_file('flower_photos', origin=dataset_url, untar=True)
data_dir = pathlib.Path(data_dir)

batch_size = 64
img_height = 224
img_width = 224

train_ds = tf.keras.utils.image_dataset_from_directory(
data_dir,
seed=123,
image_size=(img_height, img_width),
batch_size=batch_size
)

Next, we initialise a ResNet50 CNN model for training:

num_classes = len(train_ds.class_names)

def get_model():
resnet = tf.keras.applications.resnet.ResNet50(
weights=None,
include_top=True,
classes=num_classes,
classifier_activation=None
)
return tf.keras.Sequential([
tf.keras.layers.Lambda(tf.keras.applications.resnet50.preprocess_input, input_shape=(224, 224, 3)),
resnet
])

def get_compiled_model():
model = get_model()
model.compile(
optimizer='adam',
loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
metrics=['accuracy']
)
return model

model = get_compiled_model()

Single GPU Training:

TensorFlow automatically detects that GPU compute capability is available and will pick a single GPU to use by default. We call model.fit() to train our model, recording the time taken:

# Run single epoch of training to avoid cold start affecting time
model.fit(train_ds, epochs=1)

start_time = time.time()
model.fit(train_ds, epochs=EPOCHS_TO_TRAIN)
print(f"Training for {EPOCHS_TO_TRAIN} epochs took {time.time() - start_time}s")

Training the network for 10 epochs takes just under 2 minutes:

Epoch 10/10
58/58 [======================] - 11s 189ms/step - loss: 0.6831 - accuracy: 0.7499
Training for 10 epochs took 110.58388566970825s

We can verify that the training process is only utilizing a single GPU by invoking nvidia-smi:

nvidia-smi output demonstrating that a single GPU was utilised during training.

Distributed training:

To fully utilise the compute resources at our disposal, we will distribute work across the four GPUs on our training machine. The high-level approach for each step of training is detailed as follows:

Distribute an equal portion of the ‘global’ training batch to each of the GPUs.

💡 One of the core assumptions of this default distribution approach is that by splitting the global batch into equally sized sub-batches, the time taken for each GPU to process its sub-batch will be approximately equal.

The global batch is split up into sub-batches of equal size and distributed across the GPUs.

Each GPU executes the training loop with its local sub-batch of data, acquiring local losses and computing local gradients.

⚠️ As the local gradients will be summed to acquire the global gradient, it is important to ensure that the local losses are scaled appropriately by dividing by the global batch size. model.fit() handles this automatically, but we will have to do it ourselves later when defining a custom training loop.

Each GPU executes a local training loop on its sub-batch, acquiring a local loss value and obtaining gradients from the loss.

The local gradients are summed to produce a global gradient which is broadcast to all GPUs.

Local gradients are summed and reduced to a single global gradient which is distributed to all GPUs.

Finally, all GPUs update their model parameters using the global gradients, resulting in identical model parameters across all GPUs. This approach is defined as synchronous precisely because the GPUs must wait for each other to complete all of the above steps before beginning the next training step.

Adjusting the batch size:

Training on four GPUs using the above approach means that each GPU will receive sub-batches of size batch_size / 4. To keep comparisons fair, we resample our dataset with a global batch size of batch_size × 4.

num_gpus = len(tf.config.list_physical_devices('GPU'))
global_batch_size = batch_size * num_gpus

multi_gpu_train_ds = tf.keras.utils.image_dataset_from_directory(
data_dir,
seed=123,
image_size=(img_height, img_width),
batch_size=global_batch_size
)

Using model.fit():

To replicate model parameters and other variables (optimizer, loss, etc.) across all GPUs, the model needs to be created inside the scope of MirroredStrategy. The model.fit() call is strategy aware, meaning it automatically distributes the data across GPUs during training if the model has been defined in the strategy’s scope.

# Train utilising all GPUs on the system
strategy = tf.distribute.MirroredStrategy()
with strategy.scope():
multi_gpu_model = get_compiled_model()

# Run single epoch of training to avoid cold start affecting time
multi_gpu_model.fit(multi_gpu_train_ds, epochs=1)

start_time = time.time()
multi_gpu_model.fit(multi_gpu_train_ds, epochs=10)
print(f"Training for {EPOCHS_TO_TRAIN} epochs took {time.time() - start_time}s")

Now that we are distributing training across four GPUs, we see training time fall to 41.66 seconds, a decrease of approximately 61%.

Epoch 10/10
15/15 [======================] - 4s 232ms/step - loss: 0.4177 - accuracy: 0.8466
Training for 10 epochs took 41.65961089134216s

💡 Many factors can affect the reductions in training time when distributing across GPUs including sub-batch balancing, CPU bottlenecks in data processing, the saturation of GPU memory, overhead in cross-GPU communication, etc.

nvidia-smi allows us to verify that all the GPUs are being used.

nvidia-smi output demonstrating that all four GPUs were utilised during training.

Using a custom training loop:

In some cases, it is useful to have finer-grained control over the training procedure than the model.fit() API provides. In order to take such control, we will need to write a custom training loop.

⚠️ There are three requirements that need to be satisfied to distribute training using MirroredStrategy in a custom training loop:

  1. Creating variables in the strategy’s scope: Any variable that will need to be used across GPUs should be instantiated within the strategy.scope context. This includes models, optimizers, metrics, and losses.
  2. Correctly calculating losses: Losses need to be scaled by the global batch size so that the gradients derived from these losses can be sum aggregated across GPUs.
  3. Manually distributing the dataset: The distribution strategy’s experimental_distribute_dataset function ensures that each batch of data is distributed equally across GPUs during training.

We start by initialising the variables that we need to be distributed across all GPUs inside of the strategy’s scope. The loss object is defined with no reduction and our loss function scales by the global batch size to ensure losses are correctly calculated.

strategy = tf.distribute.MirroredStrategy()
with strategy.scope():
model = get_convolutional_model()
optimizer = tf.keras.optimizers.Adam()
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction=tf.keras.losses.Reduction.NONE)
def loss_fn(labels, predictions):
per_example_loss = loss_object(labels, predictions)
return tf.nn.compute_average_loss(per_example_loss, global_batch_size=global_batch_size)

Next, we define our training functions:

def train_step(train_x, train_y):
with tf.GradientTape() as tape:
y_pred = model(train_x, training=True)
loss = loss_fn(train_y, y_pred)
grads = tape.gradient(loss, model.trainable_variables)
optimizer.apply_gradients(zip(grads, model.trainable_variables))
train_accuracy.update_state(train_y, y_pred)
return loss

@tf.function
def distributed_training_epoch(dataset):
total_loss = 0.0
num_batches = 0.0
for train_x, train_y in dataset:
per_replica_losses = strategy.run(train_step, args=(train_x, train_y))
total_loss += strategy.reduce(
tf.distribute.ReduceOp.SUM, per_replica_losses, axis=None
)
num_batches += 1
epoch_loss = total_loss / num_batches
print(f"Epoch loss: {epoch_loss}")
return epoch_loss

We manually create a distributed dataset with the default data distribution approach:

💡 If this approach does not adequately balance sub-batches with respect to per-GPU execution time, use strategy.distribute_datasets_from_function() to manually control how data is batched across GPUs.

train_dataset_distributed = strategy.experimental_distribute_dataset(multi_gpu_train_ds)

Finally, we construct our training loop:

# Run single epoch of training to avoid cold start affecting time
train_loss = distributed_training_epoch(train_dataset_distributed)
train_accuracy.reset_states()

start_time = time.time()
for epoch in range(EPOCHS_TO_TRAIN):
train_loss = distributed_training_epoch(train_dataset_distributed)
train_accuracy.reset_states()
print(f"Training for {EPOCHS_TO_TRAIN} epochs took {time.time() - start_time}s")

10 epochs of our custom training loop take 35.88s representing a 67.9% reduction in training time compared to using a single GPU.

Epoch 10/10 -- loss: 0.5216162800788879 - accuracy: 0.8000000011920929
Training for 10 epochs took 35.87821805477142s

Closing Remarks:

This tutorial has demonstrated that distributed training with TensorFlow can be straightforward to implement and lead to significant reductions in training time. At Net AI, we utilise distributed training to decrease experimentation time, enabling us to innovate faster, as we continue to create cutting-edge solutions for mobile network operators.

Source Code:

The code for the above tutorial can be found in this Google Colab notebook.

Useful TensorFlow Documentation:

Distributed training: https://www.tensorflow.org/guide/distributed_training

Custom training loop: https://www.tensorflow.org/guide/keras/writing_a_training_loop_from_scratch

--

--