Scaling up running Keras models on ML Engine

Léopold Boudard
Meilleurs Agents Engineering
10 min readDec 17, 2018

Keras is a great library to get started with deep learning models. It features state of the art models that are ready to be trained and a friendly programming interface to implement and customize layers with flexible backend choices (Tensorflow, Theano, CNTK). Tensorflow also has its implementation of Keras interface tf.keras, that has same capabilities but a different codebase (being supported by Google), and is more likely to get better integration to tensorflow in the future.

Once models are pre-trained for a given task, they can be easily distributed and versioned for predictions and online learning using ML engine, based on tensorflow serving, that offers a fully integrated way to scale up machine learning models. It supports Tensorflow, scikitlearn or xgboost models, and provide a scalable batch and online prediction API, without worrying about infrastructure, deployment, or resource usage. This allows to speed up considerably the time from modelisation to production phase, and iterating over ML models.

There’s already a large amount of documentation and articles that can be found on those, though it can be very dense and hard to dig in, as it’s often the case in trending topics with vast audience, and quickly changing libraries.

This article will guide you to the process we’ve implemented to train, convert, test, export and use a Keras model on ML Engine.

Why ML engine

In our use case, we have close to billion images to process -mostly for classification tasks-, and hundreds of new images to process every day.

We started training and testing our models in keras on a small google compute engine instance with k-80 GPU, and it was fine to process thousands to a million images either for training and for predictions in reasonable time to iterate over our models.

Though, we needed to scale up to apply models to our whole historical dataset and be able to predict on incoming images (with latency constraints).

Hence, having dynamic, auto-scaling and on demand resources (GPU, CPU and storage) was a go to for our application and we opted ML engine for prediction and dataflow for data preparation.

Training Keras model

Keras offers a number of State-Of-Art models you can pick from, with already pre-trained weights that you can re-use for your own applications (using either transfer learning retraining models from pre-trained weigths or training models from scratch)

https://github.com/keras-team/keras/tree/master/keras/applications

Starting from an existing model, you’ll generally want to adapt:

  • input and output layers, shapes, and intermediary meta -non-image- features to inject within dense layers
from keras.applications import InceptionV3
from keras.layers import Dense, GlobalAveragePooling2D, Flatten, concatenate, Input
from keras.models import Model
base_model = InceptionV3(weights='imagenet',input_shape=input_shape, include_top=False)
inputs = [base_model.input]
x = base_model.output
x = GlobalAveragePooling2D()(x)
# meta features layers can be added here
# features = Input(shape=(len(model_options.features),), name='Features')
# features = Dense(20, activation='relu')(features)
# x = concatenate([x, features], axis=1)
x = Dense(1024, activation='relu')(x)
predictions = Dense(model_options.nb_classes, activation='softmax')(x)
model = Model(inputs=inputs, outputs=predictions)
  • training images preprocessing and augmentations to apply during training phase
from imgaug import augmenters as iaa
from keras.preprocessing.image import ImageDataGenerator
aug1 = iaa.GaussianBlur(sigma=(0, 2.0)) # adding random blur to training images
def additional_augmenation(image):
image = aug1.augment_image(image)
return image
generator_options = {
'featurewise_center' : True,
'featurewise_std_normalization': True,
'rescale': 1. / 255, # this rescale RGB 255 images to 0-1
'horizontal_flip': True,
'vertical_flip': True,
'zoom_range': 0.2,
'rotation_range': 90.,
'preprocessing_function': additional_augmenation
}
generator = ImageDataGenerator(
**generator_options)
flow_options = {
'target_size': (input_size, input_size),
'batch_size': batch_size,
'class_mode': 'categorical',
'shuffle': True,
'seed': seed,
'classes': class_names
}
image_generator = generator.flow_from_directory(
image_folder_dir, **flow_options
)
  • training parameters:
  1. loss function is most likely depending on what you’re trying to achieve (classification, regression etc…) and activation function used in last layer of CNN model
  2. batch_size and number of epoch that will mostly affect convergence speed; a higher batch size will decrease number of steps to perform, at cost of more memory
  3. optimizer that will also mainly affect convergence depending on your application

Below example parameters for simple image classification

loss_function = 'categorical_crossentropy'
steps_per_epoch = int(nb_training_samples // batch_size)
optimizer = keras.optimizers.Adam(lr=init_lr)
metrics = ['accuracy']
validation_steps = int(
nb_validation_samples // batch_size)
model.compile(optimizer, loss_function, metrics=metrics)
model.fit_generator(
image_generator,
steps_per_epoch=steps_per_epoch,
epochs=nb_epochs,
callbacks=callbacks,
verbose=2,
workers=1,
use_multiprocessing=False
)

Exporting Keras model to Tensorflow inference graph

If you went with Keras native implementation, you’ll first need to convert your Keras model to a tensorflow inference graph and weights as a binary protobuf file.

Since Keras use Tensorflow backend under the hood, it’s rather easy to convert used models which can be found within main tensorflow session:

import tensorflow as tf
from keras import backend as K
keras.models.load_model(input_model_path)
session = K.get_session()
tf.train.write_graph(session.graph.as_graph_def(), graph, logdir, path_to_model_file_pb, as_text=True)

If you want to freeze your graph for optimization (many operations consist in loading and saving variables), you might want to convert variable nodes to constant:

graph = graph_util.convert_variables_to_constants(session, session.graph.as_graph_def(), [n.op.name for n in model.outputs])

Customize inputs/outputs to add metadata for serving API

There are two main reasons for which you’ll want to customize your inputs/outputs served by your model:

  • model training generally involves some preprocessing steps you’ll want to embed into main graph (decode images, resize, rescale…)
  • you might want to add some metadata to your API inputs/outputs (labels, identifiers, characteristics, or specific features from predicted images…)

An example of preprocessing step including jpeg image decoding and resizing

def build_serving_inputs():
def decode_and_resize(image_str_tensor):
"""Decodes jpeg string, resizes it and returns a float32 tensor."""
image = tf.image.decode_jpeg(image_str_tensor, channels=3)
image = tf.expand_dims(image, 0)
image = tf.image.resize_bilinear(
image, [height, width], align_corners=False)
image = tf.squeeze(image, squeeze_dims=[0])
image = tf.cast(image, dtype=tf.float32)
return image
# define a variable size input tensor that takes images encoded as jpeg in base64 string
inputs = tf.placeholder(tf.string, name="input_image", shape=[None])
decoded_images = tf.map_fn(decode_and_resize, inputs, back_prop=False)
output = tf.identity(decoded_images, name="output")
return input, output

To inject preprocessing into the main graph, you’ll need to add some steps preceding inputs and branch those to your model inputs placeholders using input_map:

with tf.gfile.GFile(existing_model_path_pb, "rb") as f:
graph_def = tf.GraphDef()
graph_def.ParseFromString(f.read())
with tf.Graph().as_default() as graph:
# The name var will prefix every op/nodes in your graph
tf.import_graph_def(graph_def, name="my-model-name")
gdef_main_graph = graph.as_graph_def()with tf.Graph().as_default() as preprocessing_graph:
input, output = build_serving_inputs()
gdef_preprocessing = preprocessing_graph.as_graph_def()
# prepare builder to save serving model with preprocessing stepsbuilder = tf.saved_model.builder.SavedModelBuilder(os.path.join(EXPORT_DIR, 'serve'))# merge preprocessing graph and main model graph into a single graph
with tf.Graph().as_default() as graph_with_preprocessing:
image_input = tf.placeholder(tf.string, name="image_as_jpeg")
input_metadata = tf.placeholder(tf.string, name="object_id")
metadata = tf.identity(input_metadata, name="metadata_out")
prepared_input, = tf.import_graph_def(gdef_preprocessing,
input_map={"input:0": image_input}, return_elements=["output:0"])
# feed preprocessed inputs into main model graph
output, = tf.import_graph_def(
gdef_main_graph,
input_map={"my-model-name/input_1:0": prepared_input},
return_elements=["my-model-name/dense_2/Softmax:0"],
name=”full_graph”)

Saved model signatures

Saved models defines way to serialize a tf graph into a meta graph that also embeds variables, assets and signature -inputs and outputs for the model, ie inputs that will be served by serving model — into a protobuf schema.

There’s some signature helpers that provides convenience method to build signature for saved models.

image_info = tf.saved_model.utils.build_tensor_info(image_input)in_metadata_info = tf.saved_model.utils.build_tensor_info(input_metadata)metadata_info = tf.saved_model.utils.build_tensor_info(metadata)
output_info = tf.saved_model.utils.build_tensor_info(output)
# building serving model signature with default tf labels
signature = (tf.saved_model.signature_def_utils.build_signature_def(
inputs={'input': image_info, 'metadata': in_metadata_info},
outputs={tf.saved_model.signature_constants.CLASSIFY_OUTPUT_CLASSES: outputs_classes, 'metadata': metadata_info},
method_name=
tf.saved_model.signature_constants.CLASSIFY_METHOD_NAME))
# adding serving tag and serving signature within main graph
builder.add_meta_graph_and_variables(
session, [tf.saved_model.tag_constants.SERVING], signature_def_map={tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: signature})
builder.save()

A more complete example of feature pre-processing can be found in excellent google’s cloudml samples repository.

Test serving model locally

Once you have saved your (saved) model, it’s time to test it locally.

There are multiple options to test your model, either using gcloud client or saved model.

First you can check that you have a proper model signature using saved_model_cli.

saved_model_cli  show --dir . --tag_set serve --allMetaGraphDef with tag-set: 'serve' contains the following SignatureDefs:
signature_def[serving_default]:
The given SavedModel SignatureDef contains the following input(s):
inputs['input'] tensor_info:
dtype: DT_STRING
shape: (-1)
name: image_as_jpeg:0
inputs['metadata'] tensor_info:
dtype: DT_STRING
shape: (-1)
name: object_id:0
The given SavedModel SignatureDef contains the following output(s):
outputs['classes'] tensor_info:
dtype: DT_FLOAT
shape: (-1, 2)
name: import_1/inception/dense_2/Softmax:0
outputs['metadata'] tensor_info:
dtype: DT_STRING
shape: (-1)
name: metadata_out:0
Method name is: tensorflow/serving/classify

Then, you’ll need to implement data serialization to feed in built served model. There are multiple options available as well:

For gcloud ml-engine client:

  • as tf records which is a format based on protocol buffer, that can and should be used for batch predictions (can also use compressed tf records)
  • as json line serialized files that can be used for API calls and batch prediction as well

For saved_model_cli:

  • as serialized numpy array
  • as json line serialized file

Newline separated json file

This is the most straightforward option to start testing the serving part. Basically you need to provide files with each line having an input dict that matches your meta graph signature specifications:

import base64
import json
from PIL import Image
from io import BytesIOdef convert_image_to_bytes(image_uri, format='JPEG'):
im = Image.open(image_uri)
jpeg_im = BytesIO()
im.save(jpeg_im, format=format)
return jpeg_im
def convert_to_input_dict(image_uri):
return {'metadata': image_uri, 'input': {"b64": base64.b64encode(convert_image_to_bytes(image_uri).getvalue()).decode()}}
def convert_to_json_file(image_uris, filename):
with open(filename, 'w+') as of:
for image_uri in image_uris:
of.write(
json.dumps(convert_to_input_dict(image_uri)) +'\n')
%%bashgcloud ml-engine local predict --model-dir=models/inception-15/test/serve/ --json-instances data/json/test.json

Numpy arrays

Numpy arrays can be generated in a very similar way as json (except you need to export every input as its own numpy array, which is kinda.. not convenient)

import pandas as pd
import numpy as np
def convert_to_numpy_array(image_uris, filename):
images_records = [{'input': convert_image_to_bytes(image_uri).getvalue()} for image_uri in image_uris]
images_df = pd.DataFrame(images_records).values.flatten().astype(bytes)
meta_records = [{'metadata': image_uri} for image_uri in image_uris]
meta_df = pd.DataFrame([meta_records]).values.flatten().astype(bytes)
np.save(filename, images_df)
np.save('meta_{}'.format(filename), meta_df)
%%bashsaved_model_cli run --dir . --tag_set serve --signature_def serving_default --inputs 'input=inputs.npy;metadata=metadata.npy'

TFRecords

This one is a bit more tricky and less documented.

TFRecords are based on protobuf (binary storage format) and are well integrated within the tensorflow data api which provides convenient methods for handling batching, iterating as well as resampling based on parallel / fed-by-filename queues. You have a set of reusable feature schemas that you can re-use to build your own schemas in core examples.

Though, as it involves protobuf schemas, it needs some decoding steps to be injected before preprocessing within tf graph. As a consequence we’ll need to alter serving signature beforehand.

First we’ll start defining our schema (that will basically match the same signature defined as above) and convenient methods for encoding and decoding.

import tensorflow as tfdef _bytes_feature(value):
return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))
def encode(image_uri, filename):
writer = tf.python_io.TFRecordWriter(filename)
jpeg_im = convert_image_to_bytes(image_uri)
example = tf.train.Example(features=tf.train.Features(feature={
'input': _bytes_feature(jpeg_im.getvalue()), 'metadata': _bytes_feature(image_uri.encode())}))
writer.write(example.SerializeToString())
def decode(serialized_example):
features = tf.parse_single_example(serialized_example,
features={
'input': tf.FixedLenFeature([], tf.string),
'metadata': tf.FixedLenFeature([], tf.string),
})
return features['input'], features['metadata']

Then we’ll redefine decoding steps within our main graph:

builder = tf.saved_model.builder.SavedModelBuilder(os.path.join(EXPORT_DIR, 'serve-tf-records'))with tf.Graph().as_default() as graph_with_tf_decode:
serialized_tf_records = tf.placeholder(tf.string,
name="tf_records", shape=None)
inputs, metadata = tf.map_fn(decode,
serialized_tf_records, back_prop=False, dtype=(tf.string, tf.string))
outputs, metadata, = tf.import_graph_def(
graph_with_preprocessing,
input_map={"images_as_jpeg:0": inputs, "object_ids:0": metadata},
return_elements=["full_graph/my-model-name/dense_2/Softmax:0", "metadata_out:0"], name='')

records_info = tf.saved_model.utils.build_tensor_info(serialized_tf_records)
metadata_info = tf.saved_model.utils.build_tensor_info(metadata)
output_info = tf.saved_model.utils.build_tensor_info(outputs)
signature = (tf.saved_model.signature_def_utils.build_signature_def(
inputs={'input': records_info},
outputs={
tf.saved_model.signature_constants.CLASSIFY_OUTPUT_CLASSES: output_info, 'metadata': metadata_info},
method_name=
tf.saved_model.signature_constants.CLASSIFY_METHOD_NAME))

builder.add_meta_graph_and_variables(
sess,
[tf.saved_model.tag_constants.SERVING],
signature_def_map
{tf.saved_model.signature_constants.DEFAULT_SERVING_SIGNATURE_DEF_KEY: signature})
builder.save()

Finally you can test it by feeding a tf record dataset to this graph

;with tf.Session(graph=graph_with_tf_decode) as session:
filenames = tf.placeholder(tf.string, shape=[None])
dataset = tf.data.TFRecordDataset(filenames)
dataset = dataset.batch(1)
iterator = dataset.make_initializable_iterator()
session.run(iterator.initializer, feed_dict={filenames: [test_tf_file]})
records = session.run(iterator.get_next())
inputs = graph_with_tf_decode.get_tensor_by_name('tf_records:0')
model = graph_with_tf_decode.get_tensor_by_name('full_graph/my-model-name/dense_2/Softmax:0')
res = session.run(model, feed_dict={inputs: records})

There’s no local mode that can be fed with tf records, though you’ll be able to test it once your model is pushed on ML engine with a prediction/training job

gcloud ml-engine jobs submit prediction test --version v2 --data-format tf-record --input-paths gs://my-bucket/data/tf_records/tf-records-file --output-path gs://my-bucket/predictions/model/out --model test --region europe-west1

Here you’ll find another example using tf records for training on ML engine:

https://cloud.google.com/blog/products/gcp/performing-prediction-with-tensorflow-object-detection-models-on-google-cloud-machine-learning-engine

Export model to ML engine and call from API

Once you’ve tested your model locally , it’s quite straightforward to export it in Google Cloud Storage and then serve it within ML engine.

You’ll first need to create an ml-engine model, then add a new model version either from cloud console interface or with gcloud ml-engine client:

gsutil cp local_saved_model_path gs://my-bucket/model/servegcloud ml-engine versions create v2 --model test --origin gs://my-bucket/model/serve --python-version 3.5  --runtime-version 1.9 --framework tensorflow

Using batch predictions on tf records

From there you can use dataflow or spark to prepare your tf records datasets for training or predictions, you’ll find some complete examples of tf records preparation in cloud ml sample].

Once data is prepared, pushing a prediction job can be done as shown above with client or interacting from API (in our use case we used ML engine scheduled jobs within airflow).

Google has recently revealed a dataflow integration within tensorflow api, and therefore we can hope for more consistent and unified API combining both data preparation and data modeling in the future.

Calling from API

When you have to perform online learning and/or prediction, you can also directly use api to predict records, passing inputs as json serialized objects:

def predict_json(project, model, instances, credentials, version=None):
"""Send json data to a deployed model for prediction.
Args:
project (str): project where the Cloud ML Engine Model is deployed.
model (str): model name.
instances json properly formatted
version: str, version of the model to target.
Returns:
Mapping[str: any]: dictionary of prediction results defined by the
model.
"""
service = build('ml', 'v1', credentials=credentials)
name = 'projects/{}/models/{}'.format(project, model)
if version is not None:
name += '/versions/{}'.format(version)
response = service.projects().predict(
name=name,
body=instances
).execute()
if 'error' in response:
raise RuntimeError(response['error'])
return response['predictions']
resp = predict_json(
project, 'model',
{'instances':[convert_to_input_dict(element) for element in element]}, gcs_credentials
)

Summary

Tensorflow has grown very fast. If its core API is vast, quickly changing and has concepts that can be difficult to grasp to start with, it has integrated more accessible interfaces such as Keras, has been enriched with data preprocessing tools and aims to a more consistent API overall. Integration with fast dedicated hardware, adoption in academic research and industry, and scalable model serving makes tensorflow a good candidate to be the leading framework for many kinds of machine learning projects and applications in the future, if it’s not already.

ML-Engine and Dataflow simplify and fasten data workflows, model iterations, separation, versioning and publishing to production with auto-scalable resources, and hence limit the needs of ops and data-engineering resources that were required when implementing machine learning powered applications.

If you want to get started using ML Engine, google cloud ml samples will be a good place to get you started with Tensorflow models and data preparation in Dataflow.

--

--