Parameter Server for Distributed Machine learning

Introduction

Ameya
Coinmonks
Published in
10 min readOct 27, 2018

--

Many machine learning problems rely on large amounts of data for training and then for inference. Big internet scale companies train with terabytes or petabytes of data and create models out of it. Such models consist of weights that will optimize for error in inference for most cases. The number of weights/parameters run into orders billions to trillions. In such big models, both learning and inference on a single machine is not possible. It is useful to have a framework that can be used for distributed learning as well as inference. Since parameters need to be shared and then updated across multiple nodes using which these nodes perform and perfect their computations, these large numbers can be become bottleneck when it comes to sharing. Sharing is expensive in terms of bandwidth, synchronization for sequential ML algorithms, fault tolerance on commodity machines that can have high failure rates upto 10%. Parameter sever proposes a new framework for addressing these challenges and building distributed machine learning algorithms.

Main design ideas

To address the challenges mentioned in the last section, Parameter Server proposes the following design requirements:

  1. Efficient communication: An asynchronous task model and API that can reduce the overall network bandwidth for ML algorithms
  2. Flexible consistency models: Relaxed consistency helps with reducing the cost of synchronization. It also allows developers to choose between algorithmic convergence and system performance.
  3. Elasticity for adding resources: Allows for adding more capacity without restarting the whole computation.
  4. Efficient Fault tolerance: Given high rate of failures and large amounts of data, allow for quick recovery of tasks in a second or so - if the machine failures are not catastrophic.
  5. Ease of use: Structure the API to support ML constructs such as sparse vectors, matrices or tensors.

Example of a distributed machine learning algorithm

A classical supervised ML problem consists of optimizing a cost function given a training set of labeled data. Cost function is changed and tweaked over many samples so as to reduce or minimize the error of prediction. For tweaking the model or reducing the error, partial derivatives/gradients are computed. These gradients help move the weights in the right direction to minimize the error.

For a ‘d’ dimensional feature vector, the model tries to predict the outcome for a previously unseen x using the following formula: for every i=1 to d, ∑xi * wi. To make sure that model generalizes relatively well(i.e. it doesn’t perform well only on the training data), a regularization component is added to the prediction function. So the function mentioned above morphs into ∑xi * wi + ƛ*Norm(w). Here the ƛ is used to penalize the weights that were discovered on training data. This added term deemphasizes learned weights a little bit and hence avoids overfitting and helps with generalization on previously unseen data. For the purposes of this paper, it is less interesting to know about the exact functions used for cost minimization or regularization. This paper focuses more on systems aspect of this framework.

Let’s see how a distributed stochastic gradient descent would work for solving the prediction algorithm mentioned above. The following diagram depicts the high level process of parallelizing work for an iterative algorithm:

Distributed training algorithm

The system consists of some server nodes and worker nodes. Each worker node loads some subset of data with different workers loading different samples. Each worker computes gradients on the local data for optimizing the loss function. Each worker then sends those partial gradients to the server node. Server node aggregates those gradients received from many worker nodes. Once server node is done, worked nodes can pull the new set of weights available from the server node and perform the gradient computation again. Most of the time is spent in calculating g1, g2,…, gm gradients on workers. These are calculated using transpose(X) * w. If w is in orders of billions to trillions, this computation would be infeasible on any single node. But a good side effect of each node working on only a subset of data is that, they only need the weights corresponding to that data e.g. If one is trying to predict how likely is the user to click on an advertisement, then words such as “regularizers” would be less interesting and most workers won’t update weights for it at all. As you can see in the figure above, on a given node, only the weights(w) for which features of x are present/relevant for the dot product are necessary to be sent to the worker nodes (see the columns of x on each worker node and corresponding w in the sparse weight vector).

At a high level, the algorithm looks like the following on each worker:

  1. On each worker, compute the gradient(partial derivative) on a subset of data
  2. Push this partial gradient out to the server
  3. Pull the new sets of weights from the server when server is ready

On each server:

  1. Aggregate the gradients for all ‘m’ workers e.g. g = ∑ gi
  2. new_weights = old_weights — learning_rate * (g + ƛ*Norm(old_weights))

Architecture

High level architecture

ParameterServer consists of server groups to facilitate running of multiple algorithms in the system. Each server node in the server group is responsible for a partition of the keyspace/data. Servers can communicate with each other for migrating/replicating the data for scalability and availability. A server manager is responsible for maintaining the consistent view of the server groups. It performs liveness checks and assigns ownership of keyspaces to each server node.

A worker group is typically assigned for an application. Multiple worker nodes constitute worker groups which communicate with the sever groups for pulling of parameters and pushing of gradients as mentioned in the last section. Worker groups don’t need to communicate with each other. A scheduler looks at the worker group and assigns task to them. Generally the same worker node leverages data stored locally by running iterative algorithms on the same dataset. Parameter namespaces can be used for parallelizing work further among multiple worker groups. In addition, same parameter namespace can be shared among multiple groups: a typical example being one group supporting the real-time inference, while other worker groups can support the development of the model and updating of shared parameters.

Let’s look at some of the primitives needed to build this architecture

Key-Value API

When this paper was written, existing systems used key-value pairs for communicating the shared parameters. An example of this would be feature-id and its weights. Traditionally this has been implemented using memcached or some other key-value store. The important insight is that values are mostly some linear algebra primitives such as vectors or matrices and it is useful to be able to optimize operations on these constructs. Typical operations are dot product, matrix multiplication, L-2 norms etc. So keeping the key-value semantics and endowing values as vectors, matrices is useful for optimizing most common ML operations.

Range based push and pull

As mentioned in algorithm earlier, weights pulled from the server nodes and gradients are pushed to the server node. Supporting a range based push and pull would optimize for the network bandwidth usage. Hence the system supports w.push(R, destination), w.pull(R, destination) for pulling the data. In both cases, values corresponding to keys in range R are pushed and pulled from the destination node. Setting R to a single key, gives the simple key-value read-write semantics. Since gradients g share the same keys as that of w, w.push(R, g, destination) can be used for pushing local gradients to the destination.

Asynchronous tasks and dependency

Tasks can be thought of as RPCs. Any push or pull request can be a task, so can be a remote function that is executing. Tasks are generally asynchronous in nature and programs/applications can continue executing after issuing the task. Once the response in (key, value) pairs is received, tasks can be marked as completed. A task can be marked as completed only when all subtasks that the given task depends on returns. Task dependencies help with implementing the overall control flow for the application.

Flexible consistency models

As it can be seen from the model above, tasks run in parallel and most often on remote nodes. Because of this, where there is data dependency between various tasks, it may end up pulling old version of data. In machine learning, sometimes it is not too detrimental to pick up old weights or weights that are not too old, instead of the most recent weights. Parameterserver lets implementors select the consistency model that they are after. Three types of consistency models are supported as described in the diagram below. In sequential consistency all tasks are executed one after another. In eventual consistency, all tasks start in parallel and eventually converge. In bounded delay, tasks are started as long as any task started greater that “t” times back has already completed — the figure c below shows bounded delay of 1 .

Consistency models

Implementation details

Vector clocks: For fault tolerance and recovery, some timestamping is needed in the system. Parameter server uses vector clocks for establishing some order of events in the system. Vector clocks can be expensive based on number of nodes(m) and number of parameters(p) i.e. O(m * p). This can be very large given the large number of parameters in the system. Since most operations can be done using ranges, each range can be assigned a vector clock instead of every ket getting one. If there r unique ranges in the system, then complexity can be reduced much further by this mechanism i.e O(m * r). The system initially starts with only m clocks thus the entire key space belonging to the node has one vector clock. This can slow down recovery process. Hence as and when more ranges get created in the system, newer vector clocks are assigned to these range partitions.

Messages: Messages in the system are represented as (VectorClock(R), All keys and values in R). Since the communication can be heavy in data intensive ML applications, one can have reduce the bandwidth via caching. The same keys are communicated many times in iterative algorithms and hence nodes can cache keys. The values also might contain a lot of unchanged values during iterations and thus can be compressed effectively. ParameterServer uses snappy compression for effectively compressing lot of zeros.

Consistent Hashing: Consistent hashing is used for easy addition and removal of new nodes to the system. Every server node on the hashing ring is responsible for some keyspace. The partitioning and ownership of keyspace is managed by the server manager.

Replication: Replication is done by neighboring nodes. Each node replicates keyspaces of it’s k neighbor nodes. The master responsible for the keyspace is coordinating with the neighbors keeping the replica via synchronous communication. Whenever master pulls key ranges, it will be replicated to its neighbors. When a worker pushes data to a server, the task is not ack’d as completed until the data is replicated to the slave. Obviously this can be very chatty if done for every push and pull. Hence system also allows for replication after aggregating a certain amount of data. This aggregation is shown in the figure below. Only one replication message is exchanged between s1 and s2. It comes after both x and y have been pushed to S1, followed by the function computation task on S1 and then comes the last message of replication after which the acknowledgments(4, 5a, 5b) flow back to worker1 and worker2 to complete the task.

Replication after aggregation

Server Node Management: It is useful to be able to scale by adding new server nodes to the system. When this happens, Server Manager assigns a keyspace to the new node. This keysapce can come from some node that was terminated before or by splitting keyspace of a heavily loaded server node. This new node then pulls the keyspace that it is responsible for and also replicas from k neighbors as a slave. Generally a two-phase fetch might be needed to pull data that is being overwritten while being fetched by this new node. Server manager then broadcasts this ownership assignment message to other nodes on the ring and these other server nodes can then shrink their keyspace usage based on this new ownership assignment. For a departing node, server manager will assign the keyspace to some new incoming node. Node health is maintained via heartbeats by the server manager.

Worker nodes: Adding a new worker node is relatively simpler. Task scheduler assigns a data range to worker node. This new node will pull data from NFS or some other workers. Then the scheduler broadcasts this message to other workers so that other workers can reclaim some space by giving up some of the training data. When a worker node departs it is left to the owner of the algorithm on whether to recover the data or not — depending on the size of the data.

Conclusion

This paper lays out some important concepts of distributed machine learning. From systems perspective, this paper puts together to good use techniques like consistent hashing. Range based communication and supporting native ML constructs for message passing seems like a good insight for building an efficient ML framework.

Get Best Software Deals Directly In Your Inbox

--

--