Understanding Distributed Tensorflow

One of the biggest/best updates so far on tensorflow is the Distributed Tensorflow functionality. It allows you to scale your training to multiple machines. The tutorial here — https://www.tensorflow.org/deploy/distributed is great for folks who are very familiar with in’s and out’s of how tensorflow works. But it doesn’t really explain a lot of the terminology. This post is my journey of struggle with it.

Before I get started, I want to describe the scenario in which I turned to tensorflow for distributed to computing. I needed to train 10 neural nets with different weights in parallel and I also needed to update the weights of these networks together (Hint: Evolutionary Strategies).

Some keywords:

  • Parameter Servers — This is actually same as a worker. Typically it’s a CPU where you store the variables you need in the workers. In my case this is where I defined the weights variables needed for my networks
  • Workers — This is where we do most of our computation intensive work.
  • In the forward pass — we take variables from Parameter servers, do something with them on our workers
  • In the backward pass — we send the current state back to the parameter servers which do some update operation and give us the new weights to try out

Two important pieces of distributed tensorflow:

  • tf.train.ClusterSpec — This is where you define the parameter server and workers you are going to need
tf.train.ClusterSpec({
"worker": [
"localhost:2222",
"localhost:2223"
],
"ps": [
"ps0.example.com:2224"
]})

These translate to /job:worker/task:0 and /job:worker/task:1 and /job:ps/task:0 . So technically it expects the first worker to perform task one and the second workers to do the second task. In our case both the tasks perform similar things.

Here we have one parameter server and two workers. We want to run two separate networks with weights from ps0 on each of them.


Let’s define a cluster

cluster = tf.train.ClusterSpec({"ps": ["localhost:65062"], "worker": ["localhost:65063","localhost:65064"]})

Let’s define servers: Note you have to define a server for each one of your parameter servers and workers

ps = tf.train.Server(cluster, job_name="ps", task_index=0)
worker0 = tf.train.Server(cluster, job_name="worker", task_index=0)
worker1 = tf.train.Server(cluster, job_name="worker", task_index=1)

Let’s define variables for Parameter server ps0

with tf.device("/job:ps/task:0"):
a = tf.constant(3.0, dtype=tf.float32)
b = tf.constant(4.0)

Let’s now get worker0 and worker1 to add two constants — worker1 will have one of the constants different

On worker0

with tf.Session(worker0.target) as sess:
init = tf.global_variables_initializer()
add_node = tf.add(a,b)
sess.run(init)
print(sess.run(add_node))

On worker1, but change the value of ‘a’

with tf.Session(worker1.target) as sess:
init = tf.global_variables_initializer()
a = tf.constant(9.0, dtype=tf.float32)
add_node = tf.add(a,b)
sess.run(init)
a = add_node
print(sess.run(add_node))

If you run this file now — You will see an output like this

Notice how in the worker1 i am updating the value of the parameter a from the parameter server ps ?. If now run a sess on the parameter server like below

with tf.Session(ps.target) as sess:
init = tf.global_variables_initializer()
sess.run(init)
print(sess.run(a))
print(sess.run(b))

You will notice that ‘a’ is now 13.0 . If you edit the value of the variables in the parameter server you are actually updating it across all your workers.

Time, to now pass these values back to ps and update them because doing that backward pass is an essential step towards training a model in parallel.