Tensorflow Threading and Queues [ref]

Queues是非常強大的機制, 讓Tensorflow 能夠進行非同步運算.

Queues are a powerful mechanism for asynchronous computation using TensorFlow.

就像TensroFlow裡面的所有東西一樣, queue也是 Tensorflow graph裡的一個node . 他是一個 stateful node, 就像 variable : 其他nodes可以修改其內容. 特別的是 nodes 可以enqueue 新的東西到 queue裡, 或者將 queue中已存在的東西拉出來.

Like everything in TensorFlow, a queue is a node in a TensorFlow graph. It’s a stateful node, like a variable: other nodes can modify its content. In particular, nodes can enqueue new items in to the queue, or dequeue existing items from the queue.

讓你感受queue的威力, 我們考慮一個簡單的example 如Gif所示 . 我們會創建First in ,first out queue (FIFOQueue) 並把他填數個零. 然後創造一個graph 把 item啦出來. +1之後推回到queue裡. 這樣queue裡的item數值就會慢慢增加

To get a feel for queues, let’s consider a simple example. We will create a “first in, first out” queue (FIFOQueue) and fill it with zeros. Then we'll construct a graph that takes an item off the queue, adds one to that item, and puts it back on the end of the queue. Slowly, the numbers on the queue increase.

Enqueue, EnqueueMany, 及Dequeue 都是很特殊的nodes. 他們帶一個pointer到queue上 而不是 normal value. 並且容許更改他(應該是queue內數值). 我們推薦你考慮這些queue的方法, 是實上 這些都是 python API 內 queue obkect的方法 (e.g. q.enqueue(...)).

Enqueue, EnqueueMany, and Dequeue are special nodes. They take a pointer to the queue instead of a normal value, allowing them to change it. We recommend you think of these as being like methods of the queue. In fact, in the Python API, they are methods of the queue object (e.g. q.enqueue(...)).

Queue methods (such as q.enqueue(...)) 必須要跟 queue 跑在相同裝置上, 如果不相容 創建operation 時 裝置的置放設定就會被忽視. 稍微有感覺惹吧, 讓我們在專到更深的 details裡.

N.B. Queue methods (such as q.enqueue(...)) must run on the same device as the queue. Incompatible device placement directives will be ignored when creating these operations.

Now that you have a bit of a feel for queues, let’s dive into the details…

Queue usage overview

Queues, 像是tf.FIFOQueuetf.RandomShuffleQueue , 是在圖中進行非同步運算時非常重要的 TensorFlow 物件.

舉例來說 點行的架構是使用 RandomShuffleQueue 來準備 training model時的 input.

  • Multiple threads 準備training examples 推到queue裡
  • A training thread 執行 training op 透過 拉出 queue裡的mini batch
  • 這個架構有許多好處 Reading data how to , 此份給了ㄧ些 function 的overview 跟簡化input架構的pipelines.

Queues, such as tf.FIFOQueue and tf.RandomShuffleQueue, are important TensorFlow objects for computing tensors asynchronously in a graph.

For example, a typical input architecture is to use a RandomShuffleQueue to prepare inputs for training a model:

  • Multiple threads prepare training examples and push them in the queue.
  • A training thread executes a training op that dequeues mini-batches from the queue

This architecture has many benefits, as highlighted in the Reading data how to, which also gives an overview of functions that simplify the construction of input pipelines.

Tensorflow 的 session object 是多 multithreaded所以 multiple thread 可以很容易得透過 相同的 session 來平行跑op. 但是這並不是總是能夠這麼容易使用pythno driver 達到這件事. 所有的 threads 要一起停, 例外要被抓到跟回報. 且 queues 再結束前記得要關。

The TensorFlow Session object is multithreaded, so multiple threads can easily use the same session and run ops in parallel. However, it is not always easy to implement a Python program that drives threads as described above. All threads must be able to stop together, exceptions must be caught and reported, and queues must be properly closed when stopping.

TensorFlow 提供兩個 Classes 來幫忙做這件事: tf.train.Coordinator
tf.train.QueueRunner . 這兩個 classes 是被設計用來一起用的. Coordinator 協助多個 threads ㄧ起停. 跟回報例外 還有等待他們停止. 而 QueueRunner class 則是用來創造 數個 thread 合作把 tensors 塞到 同個 queue裡.

TensorFlow provides two classes to help: tf.train.Coordinator and tf.train.QueueRunner. These two classes are designed to be used together. The Coordinator class helps multiple threads stop together and report exceptions to a program that waits for them to stop. The QueueRunner class is used to create a number of threads cooperating to enqueue tensors in the same queue.

Coordinator

The Coordinator class 協助多個class ㄧ起停

主要的方法如下:

先創一個 Coordinator object 然後建多個threads 使用該 coordinator. 這些threads ㄧ般來說就會跑loop然後stop的時候 should_stop就會return True.

任何 thread 都可以決定 運算該不該停了. 只需要呼叫request_stop() 然後騎他threads 也會跟著停 接著should_stop() return True.

You first create a Coordinator object, and then create a number of threads that use the coordinator. The threads typically run loops that stop when should_stop() returns True.

Any thread can decide that the computation should stop. It only has to call request_stop() and the other threads will stop as should_stop() will then return True.

範例程式碼

很明顯的 這個coordinate可以管理 threads 做到非常不同的事情, 他們不需要做一樣的事. coordinator 也支援捕捉跟回報例外.更多請詳見文件:
tf.train.Coordinator.

Obviously, the coordinator can manage threads doing very different things. They don’t have to be all the same as in the example above. The coordinator also has support to capture and report exceptions. See the tf.train.Coordinatordocumentation for more details.

QueueRunner

這個QueueRunner class 創造數個 threads 重複的跑 一個 enqueue op . 這些Threads 可以用一個 coordinator 去把他們停下來. 除此之外 一個 queue runner 跑 closer thread 會自動將queue關掉(?) 當出現意外被回報到 coordinator.

The QueueRunner class creates a number of threads that repeatedly run an enqueue op. These threads can use a coordinator to stop together. In addition, a queue runner runs a closer thread that automatically closes the queue if an exception is reported to the coordinator.

你可以使用 queue runner 來實作上面所敘述的架構
首先 先建一個 graph包含 Tensroflow queue(e.g. a tf.RandomShuffleQueue)當作input example. 然後增加 training op 其 process examples 然後塞到queue裡面. 然後再把 training op 從 queue裡拿出來. (這段看無)

You can use a queue runner to implement the architecture described above.First build a graph that uses a TensorFlow queue (e.g. a tf.RandomShuffleQueue) for input examples. Add ops that process examples and enqueue them in the queue. Add training ops that start by dequeueing from the queue.

prepare queue, enqueue_op, input , train_op

其實有點不知道何時 enqueue被呼教的 ->qr.create_threads [ref]

就是跑這行 東西就開始塞進 queue

Handling exceptions

Threads 是透過 queue runners 開始執行, 會開始跑超過一個 enqueue ops 他們一樣會 抓 跟 處理 queue所產生的例外. 包含tf.errors.OutOfRangeError , 如果queue closed 會回報的訊息.

一個training使用coordinator 的程式 必須也有有類似的 catch 跟 report 的機制,以下事training loop的 example

Threads started by queue runners do more than just run the enqueue ops. They also catch and handle exceptions generated by queues, including the tf.errors.OutOfRangeError exception, which is used to report that a queue was closed.

A training program that uses a coordinator must similarly catch and report exceptions in its main loop.

Here is an improved version of the training loop above.

訓練時要處裡如果發生例外要去停 thread, finally 去等threads全停
One clap, two clap, three clap, forty?

By clapping more or less, you can signal to us which stories really stand out.