Communication Primitives in Deep Learning Frameworks

Yaroslav Bulatov
South Park Commons
Published in
6 min readOct 8, 2018

For large-scale training of neural networks you need to specify how the work is broken up across machines. This is where communication primitives come in play. There are three common approaches used in modern frameworks: MPI-collectives, task-based and computation-graph based.

MPI-collectives

examples: PyTorch, mpi4py, TensorFlow (in progress)

Instead of having one machine you now have n identical copies, how do you make use of them? The simplest way is to run identical script on each machine and have some commands that let the process communicate with its clones.

In practice this can be as simple as one extra line in your script, for instance, to do distributed data parallel SGD, your loop could look something like this

The “Reduce” line will add up “grads” variable over all the workers and distribute them back. Initially the workers may be operating on different values, since “compute_grads” is stochastic, but after the line, they are synchronized to have identical states.

Since the communication primitive is high-level, the framework can substitute efficient implementations of aggregations. Some examples

tree-reduction
butterfly
segmented ring

The last approach — segmented ring, is what’s typically used for large-scale stochastic gradient descent.

You can introduce more complicated aggregation schemes by making use of “rank” environment variable or running aggregation on a selected group of processes.

HPC community has been refining these primitives for a while, and there are others like scatter, gather and all-to-all.

References:

Task-based

Examples: Ray, Dask

Execute Python function (Task) somewhere else and get its return value lazily.

This may be the most natural for Python programmers — existing Python functions are automagically executed on remote machines and the results are are available locally when requested.

If you want to retain remote state between invocations, you could use similar syntax on objects (Actors). For instance, a sketch implementation of localSGD — run SGD independently on multiple machines and periodically average their parameters.

Example above uses “resource” tag to control where your computation actually runs. I’m using machine=1 pattern to ensure that only one actor gets instantiated per machine. For peak performance you’d probably use multi-GPU implementation optimized for single machine and Ray to parallelize across machines.

Resources:

  • Ray documentation
  • Ray team + mine paper: Flexible Primitives for Distributed Deep Learning
  • Implementing Parameter Server with Ray tutorial from Robert

Computation graph

Examples: TensorFlow

Express your result in a computation graph. Partition it across machines manually.

To go further, one might automate this process and break computation across machines automatically. This is an active research area, one example is Mesh Tensorflow.

Typically the user partitions computation graph while the runtime schedules operations to be executed in an intelligent order. Theoretically this encompasses all forms of execution. However, the most common kind of parallelism for Deep Learning — data parallel, turned out to be hard to make practical in this form. Early in TensorFlow development it was discovered that sometimes graphs were too big. Take a common use-case — asynchronous gradient descent with parameter server and 50 replicas. Combine that with inefficient representation of the model, and you end up communicating 100GB graph description to your workers.

As a result, an optimization was introduced for this use-case — run client for each replica worker and have each client construct a slightly different computation graph, only containing their replica ops.

This makes this use-case practical but makes the framework harder to understand. Generally when optimizing distributed TensorFlow programs one has to be aware which machines are hosting your clients, master and workers to avoid unexpected delays.

Further reading:

Discussion

Overall one can think of MPI approach being the most compact and providing the least control and Ray the opposite. For MPI, communication logic is specified in the main compute body — each worker knows how to communicate with its clones. Scheduler is invisible. Allowing work to continue when one worker pauses or dies is next to impossible.

Ray approach provides the most control at the cost of verbosity — you could have have different actors exchanging messages and implement any kind of aggregation logic in pure Python. In particular, the various aggregation methods like HogWild, localSGD, Asynchronous Decentralized Parallel Stochastic Gradient Descent, can be implemented naturally using the primitives. A few examples are shown in our paper “Flexible Primitives for Distributed Deep Learning.”

TensorFlow computation graph is somewhere in between — theoretically you can program an arbitrary program scheduler as a TensorFlow graph, like was done in Loom, but it’s unintuitive and will probably be never repeated again. Typically you are limited to standard model-parallelism/data parallelism patterns and hope that scheduler will do the right thing. Running multiple clients adds some flexibility in communication strategies. For instance, you can implement asynchronous gradient descent system that will continue training when one of the worker nodes dies. But you can’t instruct it to continue working if one of the parameter server nodes dies. An example of implementing such strategy using Ray is in our Flexible Primitives for Deep Learning paper.

Fault Tolerance

Running things on the cloud means that connections get dropped or machines get pre-empted altogether.

Generally frameworks leave fault tolerance to the user. Using TensorFlow API you would add “while not succeeded:” loops like these on operations such as session creation and run commands that catch Exception and retry.

PyTorch has similar story, if anything goes wrong, the user is in charge of restarting the training.

Performance on AWS

Below is a basic microbenchmark that try out Ray/TensorFlow/PyTorch on a two machine communication.

- Reserve 2 c5.18xlarge machines (25Gbps bandwidth)

- Send 100MB tensors as fast as possible

Here are the timings in milliseconds communicating 100MB Tensors. Lower is better.

For point-to-point communication across network, Ray seems to be the most performant. It has optimizations like using multiple TCP connections for a single object transfer — this speeds up things in the cloud since both AWS and GCP have per-connection limits that are lower than per-machine limits. PyTorch has efficient decoding that manages to outperform Ray locally, but it’s slower than Ray on network because it relies on single TCP connection. TensorFlow uses gRPC to send information and the decoding seems inefficient — it is bottlenecked by CPU. Breaking up 100MB tensor into smaller chunks makes it closer to PyTorch performance over network.

References

  • Benchmarks scripts
  • TensorFlow gRPC slowness issue — points to logic of gRPC decoding loop

Thanks to Robert Nishihara for suggestions on the original draft.

--

--