Training Multiple Models of TensorFlow using Dataflow

Hayato Yoshikawa
4 min readMar 7, 2017

--

https://unsplash.com/collections/582860/devices?photo=6g0KJWnBhxg

Deep learning is one of the most trending words these days, and thanks to great machine learning libraries like TensorFlow, we can access this scientific world without deep knowledge.

Once you choose a basic model to train your dataset, you would want to change its parameters, or want to try other models don’t you? Then you would notice how much coffee you need while waiting for the training finishes.

Meanwhile Google Cloud Dataflow can compute many things in parallel, and yet fully managed. Which means, don’t need to struggle with parallel coding and resource management. Why not to use it for reducing training time?

Option Settings

It’s easy to manipulate Dataflow from Jupyter Notebook running on your local machine. Here’s the settings that should be written in the first cell. Replace PROJECTID and BUCKET_NAME as you desire.

import apache_beam as beam
import apache_beam.transforms.window as window

options = beam.utils.pipeline_options.PipelineOptions()

google_cloud_options = options.view_as(beam.utils.pipeline_options.GoogleCloudOptions)
google_cloud_options.project = '{PROJECTID}'
google_cloud_options.job_name = 'tensorflow-gs'
google_cloud_options.staging_location = 'gs://{BUCKET_NAME}/binaries'
google_cloud_options.temp_location = 'gs://{BUCKET_NAME}/temp'

worker_options = options.view_as(beam.utils.pipeline_options.WorkerOptions)
worker_options.max_num_workers = 6
worker_options.num_workers = 6
worker_options.disk_size_gb = 20
# worker_options.machine_type = 'n1-standard-16'

# options.view_as(beam.utils.pipeline_options.StandardOptions).runner = 'DirectRunner'
options.view_as(beam.utils.pipeline_options.StandardOptions).runner = 'DataflowRunner'

p = beam.Pipeline(options=options)

Making grid parameters

Having trouble writing enormous parameter combinations? Here is a code to reduce your time. We only need to define ranges for each parameters, then this code generates all distinct combinations.

import itertools

param_grid = {'hidden_units': [[10, 20, 10],
[20, 40, 20],
[100, 200, 100]],
'dropout': [0.1, 0.2, 0.5, 0.8],
'steps': [20000, 50000, 100000]}

def dict_product(param):
return (dict(itertools.izip(param, x)) for x in itertools.product(*param.itervalues()))

params = list(dict_product(param_grid))

As a result of above code, we get following combination parameters.

[{'dropout': 0.1, 'hidden_units': [10, 20, 10], 'steps': 20000},
{'dropout': 0.1, 'hidden_units': [20, 40, 20], 'steps': 20000},
{'dropout': 0.1, 'hidden_units': [100, 200, 100], 'steps': 20000},
{'dropout': 0.2, 'hidden_units': [10, 20, 10], 'steps': 20000},
{'dropout': 0.2, 'hidden_units': [20, 40, 20], 'steps': 20000},
# ...
{'dropout': 0.5, 'hidden_units': [20, 40, 20], 'steps': 100000},
{'dropout': 0.5, 'hidden_units': [100, 200, 100], 'steps': 100000},
{'dropout': 0.8, 'hidden_units': [10, 20, 10], 'steps': 100000},
{'dropout': 0.8, 'hidden_units': [20, 40, 20], 'steps': 100000},
{'dropout': 0.8, 'hidden_units': [100, 200, 100], 'steps': 100000}]

Note: this combination includes useless patterns. for ex., dropout should only be set in high step number.

Training each models

This function is called for each sets of parameters. We made 36 combinations in this time, thus called 36 times in parallel. Trained models are saved in Google Cloud Storage as written in model_dir. Here we set unique model_id for each models, so that you can find them later in BigQuery entry.

def train(param):
import uuid
import json
import tensorflow as tf
from sklearn import cross_validation

model_id = str(uuid.uuid4())

# Load 'Iris' dataset
iris = tf.contrib.learn.datasets.base.load_iris()
train_x, test_x, train_y, test_y = cross_validation.train_test_split(
iris.data, iris.target, test_size=0.2, random_state=0
)

# https://www.tensorflow.org/get_started/tflearn
feature_columns = [tf.contrib.layers.real_valued_column("", dimension=4)]
classifier = tf.contrib.learn.DNNClassifier(feature_columns=feature_columns,
hidden_units=param['hidden_units'],
dropout=param['dropout'],
n_classes=3,
model_dir='gs://{BUCKET_NAME}/models/%s'% model_id)
classifier.fit(x=train_x,
y=train_y,
steps=param['steps'],
batch_size=50)
result = classifier.evaluate(x=test_x, y=test_y)

ret = {'accuracy': float(result['accuracy']),
'loss': float(result['loss']),
'model_id': model_id,
'param': json.dumps(param)}

return ret

Running a pipeline

Now we want to run a pipeline, with three simple operations. First operation is in-memory data generation, which here we defined as grid parameters. Second one is the training. Third one is to save training output into BigQuery. Very simple isn’t it?

(p | 'init' >> beam.Create(params)
| 'train' >> beam.Map(train)
| 'output' >> beam.Write(beam.io.BigQuerySink(
'project:dataset.table',
schema="accuracy:FLOAT, loss:FLOAT,
model_id:STRING, param:STRING",
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED))
)

p.run()

Monitoring jobs

We can monitor jobs on Cloud Console. In this example it took 20 minutes by 6 workers. and 10 minutes by 18 workers. (It takes approx. 5 minutes for staging and so on.)

Saving results into BigQuery

The pipeline saves training results into BigQuery.

How to use tensorboard for saved models?

Now we want to look into details of trained models. Go to Cloud Console, launch Cloud Shell and type the command below.

tensorboard --logdir=gs://{BUCKET_NAME}/models/{model_id} --port=8080

Here is the whole code, Happy coding!

--

--

Hayato Yoshikawa

Customer Engineer, Google Cloud @Google. Views are my own.