How to build Tensorflow Extended (TFX) Pipeline and deploy it with Docker and Tensorflow Serving ✨

Axel Ducamp
10 min readJan 20, 2023

--

Tensorflow Extended (TFX) is a very useful tool to build Machine Learning Pipeline. With TFX, you are guaranteed that you will process your data in the same way during the inference part than during the training part. You can also check if there are some anomalies in the data. You can also monitore your model with Tensorboard (another very useful tool).

A Tensorflow Extended Pipeline is composed by multiple parts. In this graph, you can see all of those parts.

Today we will work with the titanic dataset. We will build a complete TFX Pipeline to predict the value Survived. So our target variable is “Survived”.

Colab notebook is available here : https://colab.research.google.com/drive/1TEBo44WHlap41unLvXoJS4rADztxSbdE?usp=sharing

The first step is to import those useful libraries and also to install Tensorflow Extended

Run this command in your terminal (it can take few minutes):

pip install tfx

After that, we are ready to begin our exploration ! ✅

Import Useful Libraries

import tensorflow as tf
import tfx.v1 as tfx
from tfx.v1 import proto
from tfx.proto import example_gen_pb2
from tfx.orchestration.experimental.interactive.interactive_context import InteractiveContext
import os

context = InteractiveContext()

ExampleGen

The ExampleGen TFX Pipeline component ingests data into TFX pipelines. It consumes external files/services to generate Examples which will be read by other TFX components. It also provides consistent and configurable partition, and shuffles the dataset for ML best practice.

  • Consumes: Data from external data sources such as CSV, TFRecord, Avro, Parquet and BigQuery.
  • Emits: tf.Example records, tf.SequenceExample records, or proto format, depending on the payload format.
# Input has a single split 'input_dir/*'.
# Output 2 splits: train:eval=3:1.
output = proto.Output(
split_config=example_gen_pb2.SplitConfig(splits=[
proto.SplitConfig.Split(name='train', hash_buckets=3),
proto.SplitConfig.Split(name='eval', hash_buckets=1)
]))


# My CSV file is located in the DATA directory.
input_dir = "/content/DATAFOLDER"

example_gen = tfx.components.CsvExampleGen(input_base=input_dir, output_config=output)

context.run(example_gen)

You can check all artifacts generated by the ExampleGen and verify them!

data_link_output_folder = example_gen.outputs["examples"].get()[0].uri
print("data_link_output_folder : ", data_link_output_folder)
split_folder = os.listdir(data_link_output_folder)
print("split_folder : ", split_folder)
data_link_output_folder_split_train = os.path.join(data_link_output_folder, split_folder[0])
print("data_link_output_folder_split_train : ", data_link_output_folder_split_train)
examples_output = os.listdir(data_link_output_folder_split_train)[0]
print("examples_output : ", examples_output)
examples = os.path.join(data_link_output_folder_split_train, examples_output)
print("examples link : ", examples)

## outputs
## data_link_output_folder : /tmp/tfx-interactive-2023-01-20T04_59_52.581550-6t7iwpip/CsvExampleGen/examples/23
## split_folder : ['Split-train', 'Split-eval']
## data_link_output_folder_split_train : /tmp/tfx-interactive-2023-01-20T04_59_52.581550-6t7iwpip/CsvExampleGen/examples/23/Split-train
## examples_output : data_tfrecord-00000-of-00001.gz
## examples link : /tmp/tfx-interactive-2023-01-20T04_59_52.581550-6t7iwpip/CsvExampleGen/examples/23/Split-train/data_tfrecord-00000-of-00001.gz
verif_examples = tf.data.TFRecordDataset(examples, compression_type="GZIP")

for i in verif_examples.take(1):
tmp = i.numpy()
ex = tf.train.Example()
ex.ParseFromString(tmp)
print(ex)

## outputs
# features {
# feature {
# key: "Age"
# value {
# float_list {
# value: 38.0
# }
# }
# }
# feature {
# key: "Cabin"
# value {
# bytes_list {
# value: "C85"
# }
# }
# }
# feature {
# key: "Embarked"
# value {
# bytes_list {
# value: "C"
# }
# }
# }
# feature {
# key: "Fare"
# value {
# float_list {
# value: 71.2833023071289
# }
# }
# }
# feature {
# key: "Name"
# value {
# ...

StatisticsGen

The StatisticsGen TFX pipeline component generates features statistics over both training and serving data, which can be used by other pipeline components. StatisticsGen uses Beam to scale to large datasets.

  • Consumes: datasets created by an ExampleGen pipeline component.
  • Emits: Dataset statistics.
stat_gen = tfx.components.StatisticsGen(
examples = example_gen.outputs["examples"]
)

context.run(stat_gen)
context.show(stat_gen.outputs["statistics"])

SchemaGen

Some TFX components use a description of your input data called a schema. The schema is an instance of schema.proto. It can specify data types for feature values, whether a feature has to be present in all examples, allowed value ranges, and other properties. A SchemaGen pipeline component will automatically generate a schema by inferring types, categories, and ranges from the training data.

  • Consumes: statistics from a StatisticsGen component
  • Emits: Data schema proto
schema_gen = tfx.components.SchemaGen(
statistics = stat_gen.outputs["statistics"],
infer_feature_shape=True
)

context.run(schema_gen)

outputs :

ExampleValidator

The ExampleValidator TFX Pipeline Component

bookmark_border The ExampleValidator pipeline component identifies anomalies in training and serving data. It can detect different classes of anomalies in the data. For example it can:

perform validity checks by comparing data statistics against a schema that codifies expectations of the user. detect training-serving skew by comparing training and serving data. detect data drift by looking at a series of data. perform custom validations using a SQL-based configuration. The ExampleValidator pipeline component identifies any anomalies in the example data by comparing data statistics computed by the StatisticsGen pipeline component against a schema. The inferred schema codifies properties which the input data is expected to satisfy, and can be modified by the developer.

  • Consumes: A schema from a SchemaGen component, and statistics from a StatisticsGen component.
  • Emits: Validation results
validator = tfx.components.ExampleValidator(
statistics = stat_gen.outputs["statistics"],
schema = schema_gen.outputs["schema"]
)

context.run(validator)

Transform

The Transform TFX pipeline component performs feature engineering on tf.Examples emitted from an ExampleGen component, using a data schema created by a SchemaGen component, and emits both a SavedModel as well as statistics on both pre-transform and post-transform data. When executed, the SavedModel will accept tf.Examples emitted from an ExampleGen component and emit the transformed feature data.

  • Consumes: tf.Examples from an ExampleGen component, and a data schema from a SchemaGen component.
  • Emits: A SavedModel to a Trainer component, pre-transform and post-transform statistics.

For this demo, we will use only 4 distincts variables.

  • Two numerical variables : Age & Fare
  • Two categorical variables : Pclass & Sex
%%writefile params.py

NUM = ["Age", "Fare"]
CAT = ["Pclass", "Sex"]
TARGET = "Survived"
_transform = "transform.py"
%%writefile {_transform}

import tensorflow as tf
import tensorflow_transform as tft
import tfx.v1 as tfx

import params

NUM = params.NUM
CAT = params.CAT
TARGET = params.TARGET

def fillna(x):
if not isinstance(x, tf.sparse.SparseTensor):
return x

default_val = "" if x.dtype == tf.string else 0
return tf.squeeze(
tf.sparse.to_dense(
tf.SparseTensor(
x.indices, x.values, [x.dense_shape[0], 1]
), default_val
), axis = 1
)

def preprocessing_fn(inputs):

outputs = {}

for cat in CAT:
outputs[cat] = tft.compute_and_apply_vocabulary(
fillna(
inputs[cat]
)
)

for num in NUM:
outputs[num] = tft.scale_by_min_max(
fillna(
inputs[num]
)
)

outputs[TARGET] = inputs[TARGET]

return outputs
transform = tfx.components.Transform(
examples = example_gen.outputs["examples"],
schema = schema_gen.outputs["schema"],
module_file = os.path.abspath(_transform)
)

context.run(transform)

We can verify the transformed examples output of The TFX Transform component :

data_link_output_folder = transform.outputs["transformed_examples"].get()[0].uri
print("data_link_output_folder : ", data_link_output_folder)
split_folder = os.listdir(data_link_output_folder)
print("split_folder : ", split_folder)
data_link_output_folder_split_train = os.path.join(data_link_output_folder, split_folder[0])
print("data_link_output_folder_split_train : ", data_link_output_folder_split_train)
examples_output = os.listdir(data_link_output_folder_split_train)[0]
print("examples_output : ", examples_output)
examples = os.path.join(data_link_output_folder_split_train, examples_output)
print("examples link : ", examples)

## outputs
## data_link_output_folder : /tmp/tfx-interactive-2023-01-20T04_59_52.581550-6t7iwpip/Transform/transformed_examples/27
## split_folder : ['Split-train', 'Split-eval']
## data_link_output_folder_split_train : /tmp/tfx-interactive-2023-01-20T04_59_52.581550-6t7iwpip/Transform/transformed_examples/27/Split-train
## examples_output : transformed_examples-00000-of-00001.gz
## examples link : /tmp/tfx-interactive-2023-01-20T04_59_52.581550-6t7iwpip/Transform/transformed_examples/27/Split-train/transformed_examples-00000-of-00001.gz
verif_examples = tf.data.TFRecordDataset(examples, compression_type="GZIP")

for i in verif_examples.take(1):
tmp = i.numpy()
ex = tf.train.Example()
ex.ParseFromString(tmp)
print(ex)

## outputs
# features {
# feature {
# key: "Age"
# value {
# float_list {
# value: 0.46889227628707886
# }
# }
# }
# feature {
# key: "Fare"
# value {
# float_list {
# value: 0.13913573324680328
# }
# }
# }
# feature {
# key: "Pclass"
# value {
# int64_list {
# value: 0
# }
# }
# }
# feature {
# key: "Sex"
# value {
# int64_list {
# value: 1
# }
# }
# }
# feature {
# key: "Survived"
# value {
# int64_list {
# value: 1
# }
# }
# }
# }

Trainer

The Trainer TFX pipeline component trains a TensorFlow model.

As the Transform component, we have to write a file that contains all our code.

_trainer = "trainer.py"
%%writefile {_trainer}

from absl import logging
from typing import List, Text

import tensorflow as tf
import tfx.v1 as tfx
import tensorflow_transform as tft
from tfx_bsl.public import tfxio

import params

CAT = params.CAT
NUM = params.NUM
TARGET = params.TARGET

def _get_serving_tf_examples_signature(model, tf_transform_output : tft.TFTransformOutput):
model.tft_layer_inference = tf_transform_output.transform_features_layer()

@tf.function(
input_signature = [
tf.TensorSpec(shape=[None], dtype=tf.string, name="examples") # This name is important for the serving part !
]
)
def serving_tf_examples(tf_examples):
raw_feature_spec = tf_transform_output.raw_feature_spec()
raw_feature_spec.pop(TARGET)
raw_feature = tf.io.parse_example(tf_examples, raw_feature_spec)
transformed_features = model.tft_layer_inference(raw_feature)
logging.info("Serving_tf_examples %s", transformed_features)

outputs = model(transformed_features)

return {"outputs" : outputs}
return serving_tf_examples

def _get_serving_tf_transform_signature(model, tf_transform_output : tft.TFTransformOutput):
model.tft_eval_inference = tf_transform_output.transform_features_layer()

@tf.function(
input_signature = [
tf.TensorSpec(shape=[None], dtype=tf.string, name="transform")
]
)
def serving_tf_transform(tf_examples):
raw_feature_spec = tf_transform_output.raw_feature_spec()
raw_feature = tf.io.parse_example(tf_examples, raw_feature_spec)
transformed_features = model.tft_eval_inference(raw_feature)
logging.info("Serving_tf_examples %s", transformed_features)

return transformed_features
return serving_tf_transform

def _inputs(file_pattern : List[Text],
data_accessor : tfx.components.DataAccessor,
tf_transform_output : tft.TFTransformOutput,
batch_size : int) -> tf.data.Dataset:

return data_accessor.tf_dataset_factory(
file_pattern, tfxio.TensorFlowDatasetOptions(
batch_size, label_key = TARGET
), tf_transform_output.transformed_metadata.schema
)

def replace_num_nan(x):
"""
We will replace nan by 0. here.
This can be not a good idea but we will do that only for
the example. It is not a real Machine Learning Project.
"""
x = tf.cast(x, tf.float32)
x = tf.where(tf.math.is_nan(x), 0., x)

return x

def build_model():

cat_var = [
tf.feature_column.categorical_column_with_identity(
cat, num_buckets = 10
) for cat in CAT
]

cat_var = [
tf.feature_column.indicator_column(
cat
) for cat in cat_var
]

num_var = [
tf.feature_column.numeric_column(
num, normalizer_fn= lambda x: replace_num_nan(x)
) for num in NUM
]

# num_var will be composed by num variables +
# cat encoded variables
for var in cat_var:
num_var.append(var)

dense_feature = tf.keras.layers.DenseFeatures(num_var)

inp = {
num : tf.keras.layers.Input(shape=(), dtype=tf.float32)
for num in NUM
}

inp.update(
{
cat : tf.keras.layers.Input(shape=(1,1), dtype=tf.int32)
for cat in CAT
}
)

layers = tf.keras.layers

x = dense_feature(inp)
x = layers.Dense(16, activation = "relu")(x)
x = layers.Dense(8, activation="relu")(x)
out = layers.Dense(1, activation="sigmoid")(x)

model = tf.keras.models.Model(
inputs = inp,
outputs = out
)

model.compile(
optimizer="adam",
loss = "binary_crossentropy",
metrics = ["acc"]
)

model.summary()

return model

def run_fn(fnargs : tfx.components.FnArgs):

tf_transform_output = tft.TFTransformOutput(fnargs.transform_output)

train = _inputs(
fnargs.train_files,
fnargs.data_accessor,
tf_transform_output,
batch_size = 8
)

eval = _inputs(
fnargs.eval_files,
fnargs.data_accessor,
tf_transform_output,
batch_size = 8
)

model = build_model()

tensorboard = tf.keras.callbacks.TensorBoard(
log_dir = fnargs.model_run_dir
)

model.fit(
train,
validation_data = eval,
steps_per_epoch=10,
validation_steps = 10,
epochs = 10,
callbacks = [tensorboard]
)


## IMPORTANT : Do not change "Serving_default" name !
signatures = {
"serving_default" : _get_serving_tf_examples_signature(model, tf_transform_output),
"transform_features" : _get_serving_tf_transform_signature(model, tf_transform_output)
}

model.save(fnargs.serving_model_dir, save_format='tf', signatures=signatures)
trainer = tfx.components.Trainer(
examples = transform.outputs["transformed_examples"],
transform_graph = transform.outputs["transform_graph"],
module_file = os.path.abspath(_trainer),
)

context.run(trainer)

You can run Tensorboard with the following commands lines:

tensorboard_path = trainer.outputs["model_run"].get()[0].uri

%load_ext tensorboard
%tensorboard --logdir {tensorboard_path}

Pusher

Now we have a model trained, we can push our pipeline.

The Pusher component is used to push a validated model to a deployment target during model training or re-training. Before the deployment, Pusher relies on one or more blessings from other validation components to decide whether to push the model or not.

  • Evaluator blesses the model if the new trained model is “good enough” to be pushed to production.
  • (Optional but recommended) InfraValidator blesses the model if the model is mechanically servable in a production environment. A Pusher component consumes a trained model in SavedModel format, and produces the same SavedModel, along with versioning metadata.
serving_model_dir = "PUSHER_MODEL_1"

pusher = tfx.components.Pusher(
model=trainer.outputs['model'],
push_destination=tfx.proto.PushDestination(
filesystem=tfx.proto.PushDestination.Filesystem(
base_directory=serving_model_dir)
)
)

context.run(pusher)

Serving with Docker and do the inference part with new data 🚀

To serve the model, you will have to pull the Tensorflow Serving image from Docker Hub with this command : docker pull tensorflow/serving

When it is done, you have to be sure that the folder’s name, which contains the model files, is 1 (1 for version 1).

The next step is to run the Tensorflow Serving image with this command :

docker run -it -p 8080:8080 -v E:\TFX_SERVING\PUSHER_MODEL_1\content:/models/ — entrypoint /bin/bash tensorflow/serving

You will mount a volume and copy paste all the model files into a folder in the container (done with -v). The folder’s name is /models.

When it is done, you will be able to serve the model with the following command line : tensorflow_model_server — rest_api_port=8080 — model_name=model — model_base_path=/models.

And … GOOD JOB your model is ready to ingest some new data and return a prediction value!

Inference

To send new data to the model, you will need to format your data like this :

import base64

feature_spec = {
"Pclass": tf.train.Feature(int64_list=tf.train.Int64List(value=[int(3)])),
"Sex": tf.train.Feature(bytes_list=tf.train.BytesList(value=["male".encode()])),
"Age": tf.train.Feature(float_list=tf.train.FloatList(value=[float(34.5)])),
"Fare": tf.train.Feature(float_list=tf.train.FloatList(value=[float(3.0)])),
"Name" : tf.train.Feature(bytes_list=tf.train.BytesList(value=["Kelly, Mr. James".encode()])),
"Parch" : tf.train.Feature(int64_list=tf.train.Int64List(value=[int(0)])),
"Cabin" : tf.train.Feature(bytes_list=tf.train.BytesList(value=["E".encode()])),
"Embarked" : tf.train.Feature(bytes_list=tf.train.BytesList(value=["C".encode()])),
"PassengerId" : tf.train.Feature(int64_list=tf.train.Int64List(value=[int(3)])),
"SibSp" : tf.train.Feature(int64_list=tf.train.Int64List(value=[int(3)])),
"Ticket" : tf.train.Feature(bytes_list=tf.train.BytesList(value=["C".encode()]))
}

example = tf.train.Example(
features=tf.train.Features(feature=feature_spec)).SerializeToString()

# Encode your serialized Example using base64 so it can be added into your
# JSON payload.
b64_example = base64.b64encode(example).decode()
result = [{'examples': {'b64': b64_example}}]
result

## outputs
# [{'examples': {'b64': 'Cs0BCg4KBVBhcmNoEgUaAwoBAAoQCgRGYXJlEggSBgoEAABAQAoPCgZUaWNrZXQSBQoDCgFDCg8KBlBjbGFzcxIFGgMKAQMKFAoLUGFzc2VuZ2VySWQSBRoDCgEDCg4KBUNhYmluEgUKAwoBRQocCgROYW1lEhQKEgoQS2VsbHksIE1yLiBKYW1lcwoPCgNTZXgSCAoGCgRtYWxlCg8KA0FnZRIIEgYKBAAACkIKDgoFU2liU3ASBRoDCgEDChEKCEVtYmFya2VkEgUKAwoBQw=='}}]

At the end, you just have to send your data to your model (wich is running in a Docker container).

The final command line :

curl -d '{"inputs": {"examples": {"b64": "Cs0BChAKBEZhcmUSCBIGCgQAAEBACg4KBVNpYlNwEgUaAwoBAwoRCghFbWJhcmtlZBIFCgMKAUMKDwoDQWdlEggSBgoEAAAKQgoUCgtQYXNzZW5nZXJJZBIFGgMKAQMKDwoGUGNsYXNzEgUaAwoBAwoOCgVQYXJjaBIFGgMKAQAKHAoETmFtZRIUChIKEEtlbGx5LCBNci4gSmFtZXMKDgoFQ2FiaW4SBQoDCgFFCg8KA1NleBIICgYKBG1hbGUKDwoGVGlja2V0EgUKAwoBQw=="}}}'
-X POST http://localhost:8080/v1/models/model:predict

Conclusion

Congrats !! ✅

I want to really thank you for reading this article! Do not hesitate to contact me if you have any questions. I know that building a TFX Pipeline is not a easy task at the beginning. But remember, keep learning, it is super important. 😃

--

--