tf2.2 estimator的使用範例

任書瑋
Data Scientists Playground
13 min readJul 25, 2020

分成3個階段介紹
Dataset, Model Function 和 Train and Evaluate

Dataset

因為需要在訓練同時做Data augmentation, 因此不使用 TFRecords 的方式

Dataset.from_tensor_slices

首先得到 fileNames list 和 labels list
fileNames = [fileName_1, fileName_2……fileName_n], fileName_n為字串labels = [label_1, label_2……lable_n], lable_n為int

dataset = tf.data.Dataset.from_tensor_slices((fileNames ,labels))

shuffle

所有檔名和 lable 都在 cpu 記憶體了, 因此可以很簡單的 shuffle, 這裡的bufferSize 就是檔案數量

dataset = dataset.shuffle(bufferSize)

map function

使用 map function 將檔名和 label 實際轉換為訓練資料(讀檔與 Data augmentation)

dataset = dataset.map(_parse_function, num_parallel_calls=tf.data.experimental.AUTOTUNE)

依據需求將 _parse_function 做任意更動

def _parse_function(filename, label):
def fileName2feature(fileNameString):
fileNameString = str(fileNameString, encoding=”utf-8")
feature = getFeatureFunction(fileNameString)
return [feature]
feature = tf.numpy_function(fileName2feature, [filename], tf.float32)
return ({“feature”: feature}, {“label”: label})

padded batch

決定 batch 大小 ,並將 model 的 input size固定

dataset = dataset.padded_batch(batchSize, padded_shapes = ({“feature”:[98, 40]}, {“label”:[]}), padding_values = ({“feature”:0.0}, {“label”:1}))

prefetch

預先準備資料

dataset = dataset.prefetch(tf.data.experimental.AUTOTUNE)

repeat

training dataset 如何終止會在最外層的 Estimator max_steps 設定

if training:
dataset = dataset.repeat()
else:
dataset = dataset.repeat(1)

Model Function

Learning Rate Custom Schedule

class LearningRateCustomSchedule(tf.keras.optimizers.schedules.LearningRateSchedule):
def __init__(self, init_lr, warmup_steps=1000, decay_steps=10000):
super(LearningRateCustomSchedule, self).__init__()
def __call__(self, step):
learning_rate = tf.compat.v1.train.polynomial_decay(self.init_lr, step, self.decay_steps, end_learning_rate=1e-8)
return learning_rate

Model

class MyModel(tf.keras.Model):
def __init__(self):
super(MyModel, self).__init__()
self.flatten = tf.keras.layers.Flatten(data_format="channels_last")
self.dropout = tf.keras.layers.Dropout(tf.compat.v1.placeholder_with_default(0.2, shape=[], name="dropout"))
self.dense1 = tf.keras.layers.Dense(64, activation=None)
self.dense2 = tf.keras.layers.Dense(20, activation=None)
def call(self, inputs, training):
x = self.flatten(inputs)
if training:
x = self.dropout(x)
x = self.dense1(x)
outputs = self.dense2(x)
return outputs

tf.estimator.Estimator 所需要的 model_fn

def model_fn(features, labels, mode, params):
tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.INFO)
x = tf.identity(features[“feature”], ‘x’)
training = mode == tf.estimator.ModeKeys.TRAIN
with tf.compat.v1.variable_scope(‘ExampleNet’, reuse=None):
model = MyModel()
outputs = model(x, training)
outputs = tf.identity(outputs, ‘y’)
argmax = tf.argmax(outputs, -1, output_type=tf.int32)
if mode == tf.estimator.ModeKeys.TRAIN or mode == tf.estimator.ModeKeys.EVAL:
labels = labels[“label”]
loss = tf.reduce_mean(tf.nn.sparse_softmax_cross_entropy_with_logits(labels=labels, logits=outputs), name=’loss')
learningRate = LearningRateCustomSchedule(params[‘lr’], decay_steps = params[‘totalStep’], warmup_steps= -1)
globalStep = tf.compat.v1.train.get_or_create_global_step()
logging_hook = tf.estimator.LoggingTensorHook({“loss”:loss, ‘global_step’: globalStep}, every_n_iter=params[‘displayStep’])

if mode == tf.estimator.ModeKeys.TRAIN:
optimizer = tf.compat.v1.train.AdamOptimizer(learningRate(globalStep)).minimize(loss, global_step=globalStep)
tf.compat.v1.summary.scalar(‘loss’, loss)
tf.compat.v1.summary.scalar(‘learning_rate’, learningRate(globalStep))
summary_hook = tf.estimator.SummarySaverHook(params[‘displayStep’], output_dir=params[“log”], summary_op=tf.compat.v1.summary.merge_all())
return tf.estimator.EstimatorSpec(mode=mode, loss=loss, train_op=optimizer, training_hooks=[logging_hook, summary_hook])
elif mode == tf.estimator.ModeKeys.EVAL:
eval_metric_ops = {‘accuracy’: tf.compat.v1.metrics.accuracy(labels=labels, predictions=argmax)}
return tf.estimator.EstimatorSpec(mode=mode, loss=loss, evaluation_hooks=[logging_hook], eval_metric_ops=eval_metric_ops)
elif mode == tf.estimator.ModeKeys.PREDICT:
predictions = {‘logits’: outputs, ‘embedding’: embedding}
return tf.estimator.EstimatorSpec(mode, predictions=predictions)

以下為特別須注意的地方

tf.estimator.LoggingTensorHook 訓練的過程中顯示資訊於 cmd 視窗, 記得要加這條程式碼 tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.INFO)

tf.estimator.SummarySaverHook 訓練的資訊寫入tensorboard 檔

mode == tf.estimator.ModeKeys.TRAIN 下需要定義 optimizer

mode == tf.estimator.ModeKeys.EVAL 的 eval_metric_ops 會自動將資訊寫入 EVAL 單獨的 tensorboard 檔, 並用於 Estimator early stop 中

如果需要 tf.estimator.BestExporter, 要定義 mode == tf.estimator.ModeKeys.PREDICT 的回傳

Train and Evaluate

configure

Estimator 本身支援中斷後恢復(cfg.modelDir不變, cfg.pretrainModel=None)

mirrored_strategy = tf.distribute.MirroredStrategy(devices=[cfg.device])run_config = tf.estimator.RunConfig(keep_checkpoint_max = cfg.NNparameter[“keep_checkpoint_max”], model_dir = cfg.modelDir, save_checkpoints_steps=cfg.NNparameter[“save_checkpoints_steps”], train_distribute=mirrored_strategy, eval_distribute=mirrored_strategy)warm_start_from = None
if cfg.pretrainModel != None:
warm_start_from = tf.estimator.WarmStartSettings(ckpt_to_initialize_from=cfg.pretrainModel, vars_to_warm_start=”.*ExampleNet.*”,)
estimator = tf.estimator.Estimator(model_fn=model_fn, params=cfg.NNparameter,config=run_config, warm_start_from=warm_start_from)

tf.estimator.BestExporter

def serving_input_receiver_fn():
inputs = {‘feature’: tf.compat.v1.placeholder(shape=[None, 98,40], dtype=tf.float32)}
return tf.estimator.export.ServingInputReceiver(inputs, inputs)
bestExporter = tf.estimator.BestExporter(serving_input_receiver_fn =serving_input_receiver_fn, exports_to_keep=5)

early stop hook

metric_name 為 mode == tf.estimator.ModeKeys.EVAL 的eval_metric_ops 定義, min_steps 為最少要訓練多少 step, run_every_steps 相當於多少 step eval 一次

earlyStopHook = tf.estimator.experimental.stop_if_no_decrease_hook(estimator, metric_name=’accuracy’, max_steps_without_decrease=3*cfg.NNparameter[“save_checkpoints_steps”], run_every_steps=cfg.NNparameter[“save_checkpoints_steps”], run_every_secs=None, min_steps=cfg.NNparameter[“min_steps”])

train_spec 與 eval_spec

dataset 需使用 lambda function 輸入, train_spec 使用 early stop hook, eval_spec 使用 best exporter, eval_spec steps=None 代表把所有的 evaluation data 跑完

train_spec = tf.estimator.TrainSpec(input_fn=lambda:Data(cfg.trainData).dataset, max_steps=cfg.NNparameter[“totalStep”], hooks=[earlyStopHook])eval_spec = tf.estimator.EvalSpec(input_fn=lambda:Data(cfg.evalData).dataset, steps=None, start_delay_secs=2, throttle_secs=5, exporters=bestExporter)

train_and_evaluate

tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec)

--

--