End-to-End ML in Tensorflow and Tensorflow Extended — 3

jagesh maharjan
delvify
Published in
4 min readNov 12, 2019

In previous posts I introduced some simple deployments using Tensorflow. This post will focus more on the Tensorflow’s Transform and Apache Beam. In the TensorFlow Transform Github repository, I posted some issues (and resolutions) related to the census_example. Because you can refer to this link for more detail, I will not post all the code, just what is needed for explanatory purposes.

This example uses census data to predict if a particular person would likely earn more than or equal to 50k or less than 50k. The input features are categorized into:

CATEGORICAL_FEATURE_KEYS,
NUMERIC_FEATURE_KEYS,
OPTIONAL_NUMERIC_FEATURE_KEYS.

We need to know this because we need to transform the data into the respective datatypes using Tensorflow transform.

RAW_DATA_FEATURE_SPEC = dict([(name, tf.io.FixedLenFeature([], tf.string)) RAW_DATA_METADATA = dataset_metadata.DatasetMetadata(for name 
schema_utils.schema_from_feature_spec(RAW_DATA_FEATURE_SPEC)) in CATEGORICAL_FEATURE_KEYS] +
[(name, tf.io.FixedLenFeature([], tf.float32)) for name in NUMERIC_FEATURE_KEYS] +
[(name, tf.io.VarLenFeature(tf.float32)) for name in OPTIONAL_NUMERIC_FEATURE_KEYS] +
[(LABEL_KEY, tf.io.FixedLenFeature([], tf.string))])

We need to know this because we need to transform the data into the respective datatypes using Tensorflow transform.

RAW_DATA_FEATURE_SPEC.

Based on the feature specifications, tensorflow_transform’s tf_metadata function creates a schema of the input feature types. The feature specs and schema look like this:

def preprocessing_fn(inputs):
outputs = inputs.copy()
for key in NUMERIC_FEATURE_KEYS:
outputs[key] = tft.scale_to_0_1(outputs[key])
for key in OPTIONAL_NUMERIC_FEATURE_KEYS:
dense = tf.compat.v1.sparse_to_dense(
outputs[key].indices, [outputs[key].dense_shape[0], 1],
outputs[key].values,
default_value=0.)
dense = tf.squeeze(dense, axis=1)
outputs[key] = tft.scale_to_0_1(dense)
for key in CATEGORICAL_FEATURE_KEYS:
tft.vocabulary(inputs[key], vocab_filename=key)
table_keys = ['>50K', '<=50K']
initializer = tf.lookup.KeyValueTensorInitializer(
keys=table_keys,
values=tf.cast(tf.range(len(table_keys)), tf.int64),
key_dtype=tf.string,
value_dtype=tf.int64)
table = tf.lookup.StaticHashTable(initializer, default_value=-1)
outputs[LABEL_KEY] = table.lookup(outputs[LABEL_KEY])
return outputs

The most crucial aspect to consider before training a model with this data is data transformation. This is sometimes called the pre-processing function. We will pass this during training as a wrapper function.

Once done, we will use Apache Beam to convert our training data or evaluation data into a TFRecord. Apache Beam is high-level API built for processing batch data and streaming the data for fast processing (will not go into details here, as this is for another post):

with beam.Pipeline() as pipeline:
with tft_beam.Context(temp_dir=tempfile.mktemp()):
ordered_columns = ['age', 'workclass', 'fnlwgt','education', 'education-num','marital-status', 'occupation', 'relationship', 'race', 'sex','capital-gain', 'capital-loss', 'hours-per-week', 'native-country','label']
converter = tft.coders.CsvCoder(ordered_columns, RAW_DATA_METADATA.schema)
raw_data = (
pipeline
| "ReadTrainData" >> beam.io.ReadFromText(train_data_file)
| "FixCommasTrainData" >> beam.Map(lambda line: line.replace(', ', ','))
| "DecodeTrainData" >> MapAndFilterErrors(converter.decode)
)
raw_dataset = (raw_data, RAW_DATA_METADATA)
transformed_dataset , transform_fn = (raw_dataset | tft_beam.AnalyzeAndTransformDataset(preprocessing_fn))
transformed_data, transformed_metadata = transformed_dataset
transformed_data_coder = tft.coders.ExampleProtoCoder(transformed_metadata.schema) _ = (
transformed_data
| "EncodeTrainData" >> beam.Map(transformed_data_coder.encode)
| "WriteTrainData" >> beam.io.WriteToTFRecord(os.path.join(working_dir, TRANSFORMED_TRAIN_DATA_FILEBASE))
)

The function ReadFromText of Apache Beam will read a text file, fix the commas at every row (a record) and use the MapAndFilterErrors class to decode the .csv coded data. We can then, very simply create a dataset with raw data and the schema as shown above. Next, we want to do the transformation using the AnalyzeAndTransformDataset with our pre-processing function as a wrapper. Finally, we write the transformed data to the disk as a TFRecord datatype file. We do the same for the evaluation dataset.

Here we show how to create a training wrapper with a transformed dataset as its input parameter:

def _make_training_input_fn(tf_transform_output, transformed_examples, batch_size):
def input_fn():
dataset = tf.data.experimental.make_batched_features_dataset(
file_pattern=transformed_examples,
batch_size=batch_size,
features=tf_transform_output.transformed_feature_spec(),
reader=tf.data.TFRecordDataset,
shuffle=)
transformed_features = tf.compat.v1.data.make_one_shot_iterator(dataset).get_next()
transformed_labels = transformed_features.pop(LABEL_KEY)
return transformed_features, transformed_labels
return input_fn

We can also create a wrapper for a serving function that can be used during evaluation as well as during later serving. The difference is that we don’t require the label features, which would defeat the purpose of training!

Here we use the estimator API for the training and we use LinearClassifier, but feel free to use any other such as DNNClassifier:

estimator = tf.estimator.LinearClassifier(
feature_columns=get_feature_columns(tf_transformed_output),
config=run_config,
loss_reduction=tf.losses.Reduction.SUM
)
train_input_fn = _make_training_input_fn(
tf_transformed_output,
os.path.join(working_dir, TRANSFORMED_TRAIN_DATA_FILEBASE + '*'),
batch_size=TRAIN_BATCH_SIZE
)
estimator.train(input_fn=train_input_fn, max_steps=TRAIN_NUM_EPOCHS * num_train_instances / TRAIN_BATCH_SIZE)

Once the training is completed, we will have an output model saved in our working_dir and we can check the signature definition of our saved model, using the saved_model_cli of TensorFlow.

figure: signature definition of the saved model

Now we are ready to serve the model trained from the census dataset, using TensorFlow serving.

figure: serving the model using Tensorflow model serving

Of course, it’s good to create a client for the inference. In the previous posts, I used python to create a client for inference. We can actually use the Curl to do the inference, and it’s the simplest way to do so:

$ curl -d '{"examples": [{"age":30.0, "workclass":"Self-emp-not-inc", "education":"Bachelors", "education-num":17.0, "marital-status":"Married-civ-spouse", "occupation":"Exec-managerial", "relationship":"Husband", "race":"White", "sex":"Male", "capital-gain":0.0, "capital-loss":0.0, "hours-per-week":40.0, "native-country":"United-States"}]}' -X POSThttp://localhost:8501/v1/models/census:classify
{
"results": [[["0", 0.498906225], ["1", 0.501093805]]
]

I hope you have enjoyed these posts.

Tensorflow and Tensorflow Extended are powerful tools. I have tried in these series of post to show some simple implementations to get you started using these tools.

Please contribute comments, corrections and further discussion to us at hello@delivfy.io

Ready To Learn More?

Get in touch with our team of experts to find out how we can help you build predictive AI solutions for your business.

www.delvify.io

--

--