ML Pipelines (Vertex) Part 2

Jesus
11 min readSep 20, 2022

In my previous post I explained the basics behind creating a pipeline on Vertex, with a simple (non-MachineLearning) example, now it’s time to do something real. This is the first time I will explain every single detail behind data science and ML engineer.

As mentioned, there are different ways to create pipeline components;

  • Custom Lightweight Function. (Easiest)
  • Custom Python Training and Custom Container Training Job. (Rich and we can use Vertex Managed Dataset and do data split)
  • Custom Training Job. (Faster, easier to debug)

Because I love my time I’ll spend it by creating something with the latest, but stay tuned for new publications about other ways.

Overview

The use case: we’ll predict “Miles Per Galon” from car technical specs . The prediction is a numeric scalar value [18, 17, 14, 16, etc].

Dataset come from; “http://archive.ics.uci.edu/ml/machine-learning-databases/auto-mpg/auto-mpg.data

First off we apply EDA (Exploratory Data Analysis) to understand the nature of the data before the training and finally we orchestrate steps through a pipeline:

  1. Getting dataset and split features vs labels (what we predict).
  2. Creating Dummies; 1 and 0 representation for categorical information.
  3. Data Standarization; when some features values are high respect others they could biases the smallest.
  4. Building Neural network for training; 2 layers of 64 neurons with Relu activation function (to learn from non-linear data/complex patterns), and 1 neuron in the output layer (regression/numeric/linear).

Note: Use jupyerlab/workbench or any notebook (it’s easy). Following instructions assume you’re using one of those.

Exploratory Data Analysis

Features are the independent variables that we use to predict something, the “something” to be predicted is called dependent variable or taget class.

target class

Data distribution is explained by its skewness which can either draw an imbalance or a normal distribution, the last one is also called Gaussian distribution and it follows a bell curve like this:

Having this shape is recommended for many ML frameworks and it’s useful to make training algorithms converging faster and avoid large values bias.

There are 2 methods to transform data in ML one is called normalization and is used to transform features to be on a similar scale and standarization which is useful where the data already follows a Gaussian ditribution and wants to avoid large values bias.

To make this post compact I will skip the code to get chart below, but you can find it here.

From the chart we can find that the data is close to the normal distribution therefore we’ll use “standarization” as transformation method.

Another important task is removing any possible outliers that can affect our training.

For the sake of simplicity I have only taken the “Model Year” feature, but the normalization/standarization should be done through the entire set of independent variables or features.

We observe from heathmap there is a strong correlation between cylinders, displacement and horsepower with weight, which makes totally sense since those are phisically features and their values affect car’s weight. The origin and model of the car is slightly correlated to miles per gallon, so those will be weighted features for the training.

Looking at the mean of the independent variables we observe that weight has a higher value than the others, so this could potentially biases our feautres therefore we need to transform the data with the method previously mentioned. Now we need to quick explain the ML method we’re going to use during our experiment.

How Neural Networks Work?

Neural Network in Action

Learning process in neural networks is composed by 2 phases, feed forward and back propagation.

Feed forward is a matrix multiplication (linear algebra) between the data (independent variables only)and parameters called “weights”, at the same time data can be non-linear and to help the algorithm to learn from this kind of shapes we apply something called activation functions to each layer (yellow in the image above), in this case we’ll be using relu.

The result is then compared with real values (target class) and get an error, so we can use different methods to change the weights and reduce such error, one of them is called gradient descent which applies derivatives to each phase during the feed forward, that reverse method is called back propagation, we continuously repeat this process until ground to 0 (that’s learning).

What?

You probably did’t get anything from the last 2 paragraphs, and let me tell you this; the method is complex and rocky, so that’s why there are many frameworks on internet, to make our life easier. In this post I’ll be using tensorflow+keras as bagpack for Vertex AI, but feel free to experiment with others.

Let’s do CODE!

If you’re using Vertex Workbench, believe me, your live would be much more easier since you don’t have to deal with authentication stuff to do this.

Variables as ALWAYS

PROJECT_ID='jchavezar-demo' # Change it
REGION='us-central1'
TRAIN_IMAGE_URI=f'gcr.io/{PROJECT_ID}/custom_train:v1'
PREDICT_IMAGE_URI=f'gcr.io/{PROJECT_ID}/custom_predict:v1'
PIPELINE_ROOT_PATH='gs://vtx-root-path' # Change it
AIP_STORAGE_URI='gs://vtx-artifacts'

Create Folder Structure

.
├── ..
├── custom_train_job # Main folder
│ ├── train # Training Folder
│ ├── prediction # Prediction/Inference folder
└── ...
My boring folder structure
!rm -fr custom_train_job
!mkdir custom_train_job
!mkdir custom_train_job/train
!mkdir custom_train_job/prediction
!touch custom_train_job/train/__init__.py
!touch custom_train_job/prediction/__init__.py

Training Model Code

During the data exploration and analysis we found the standardization is required, this is exactly what happens when someone talk about preprocessing and it’s beneficial for keeping transformation consistency between training-serving data.

The main steps for the following code are:

  1. train_pre_process(); the function will clean the data, map strings to numbers and get dummies (ML needs numeric values to do the magic).
  2. Next step will get statistics from the data (mean and standard deviation) and will be used later during inference/prediction.
  3. norm(); we standarize data to lower values by reducing standard deviation to 0 and avoid BIAS.
  4. pred_data_process(); we recreate prediction/inference data by using the stats during the transformation. Remember to us the same mean and standard deviation used during previous steps.
%%writefile custom_train_job/train/preprocess.pyimport os
import sys
import pandas as pd
data_uri = os.environ['AIP_STORAGE_URI']## Data Cleaning and Normalizating, exporting statistics.def train_pre_process(dataset):# Cleaning data and doing transformationsdataset = dataset.dropna()
dataset['Origin'] = dataset['Origin'].map({1: 'USA', 2: 'Europe', 3: 'Japan'})
dataset = pd.get_dummies(dataset, prefix='', prefix_sep='')

train_dataset = dataset.sample(frac=0.8, random_state=0)
test_dataset = dataset.drop(train_dataset.index)

train_stats = train_dataset.describe()
train_stats.pop('MPG')
train_stats = train_stats.transpose()
# Storing stats for transformations in Google Cloud Storagetrain_stats.to_csv(f'{data_uri}/mpg/stats.csv')
train_labels = train_dataset.pop('MPG')
test_labels = test_dataset.pop('MPG')

# Standarization (Z-Score Normalization)
def norm(x):
return (x - train_stats['mean'])/train_stats['std']
normed_train_data = norm(train_dataset)
normed_test_data = norm(test_dataset)
return normed_train_data, train_labels, normed_test_data, test_labels## Using training statistics to equals standarization.def pred_data_process(data: list):
column_names = ['Cylinders', 'Displacement', 'Horsepower', 'Weight', 'Acceleration', 'Model Year', 'Origin']
region_list = ['USA', 'Europe', 'Japan']

dataset = pd.DataFrame([data], columns=column_names)
dataset = dataset.dropna()
for data in region_list:
if dataset['Origin'][0] == data:
dataset[data] = 1
else: dataset[data] = 0

dataset = dataset.drop(columns=['Origin'])
## Train stats
train_stats = pd.read_csv(f'{data_uri}/mpg/stats.csv', index_col=[0])

def norm(x):
return (x - train_stats['mean'])/train_stats['std']

return norm(dataset)

Then we copy the file into prediction folder because this function will be used by 2 components; training and prediction.

!cp custom_train_job/train/preprocess.py custom_train_job/prediction/preprocess.py

Now Lets Create the Training File

The important part is how we build our neural network, the function is highlighted (build_model), the code is very straightforward; 2 hidden layers of 64 neural networks each using relu as activation function and 1 output layer, the optimizer during back propagation is RMSprop and the error to know if the model is behaving well is “mse, mean squared error”.

%%writefile custom_train_job/train/train.pyimport os
import sys
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
model_uri = os.environ['AIP_STORAGE_URI']def build_model(train_data):
model = keras.Sequential([
layers.Dense(64, activation='relu', input_shape=[len(train_data.keys())]),
layers.Dense(64, activation='relu'),
layers.Dense(1)
])
optimizer = tf.keras.optimizers.RMSprop(0.001)

model.compile(loss='mse',
optimizer=optimizer,
metrics=['mae', 'mse'])

return model
def train_model(train_data, train_labels, epochs: int = 1000):

print('[INFO] ------ Building Model Layers', file=sys.stderr)
model = build_model(train_data)
epochs = epochs

# The patience parameter is the amount of epochs to check for improvement
early_stop = keras.callbacks.EarlyStopping(monitor='val_loss', patience=10)

print('[INFO] ------ Iterations / Training', file=sys.stderr)
early_history = model.fit(train_data, train_labels,
epochs=epochs, validation_split = 0.2,
callbacks=[early_stop])

print('[INFO] ------ Saving Model', file=sys.stderr)
model.save(f'{model_uri}/mpg/model')
return model

The code input is a dataset from here, and the output are the stats (mean and standard deviation) and the model, so we create a main file called train.py:

%%writefile custom_train_job/train/main.pyimport sys
import preprocess
import train
import pandas as pd
from tensorflow import keras
dataset_path = keras.utils.get_file("auto-mpg.data", "http://archive.ics.uci.edu/ml/machine-learning-databases/auto-mpg/auto-mpg.data")
column_names = ['MPG','Cylinders','Displacement','Horsepower','Weight','Acceleration', 'Model Year', 'Origin']
dataset = pd.read_csv(dataset_path, names=column_names, na_values = "?", comment='\t',sep=" ", skipinitialspace=True)
## Clean, Normalize and Split Dataprint('[INFO] ------ Preparing Data', file=sys.stderr)
train_data, train_labels, test_data, test_labels = preprocess.train_pre_process(dataset)
## Train model and save it in Google Cloud Storageprint('[INFO] ------ Training Model', file=sys.stderr)
train.train_model(train_data, train_labels)

That was easy!

Every step mentioned here is wrapped in files, centralized in main.py, and packed in a container like this:

Training Files

Now let’s create our lightweight container that will support the training

Docker File for Training

%%writefile custom_train_job/train/DockerfileFROM python:latestRUN python -m pip install --upgrade pip
RUN pip install pandas gcsfs tensorflow
COPY / /trainer
CMD ["python", "trainer/main.py"]

Tag it and Build it!

Thank god we have Google Cloud Build.

!gcloud builds submit -t $TRAIN_IMAGE_URI custom_train_job/train/.Create Pipeline

Serving; Web Stateless Container

Now let’s build the container used as prediction, inference, serving, whatever you like to call it, but basically is a web server container that will handle your HTTP calls with data for predict, it will use the pre-processing module because remember it needs to be treated as we did during the training.

Here is the file, I used FastAPI to handle the requests, the only requirement is to have a /health_check (200 OK), and /predict where preprocessing data and loading the ml model, but feel free to use yours. By the way, if you’re using sklearn, tensorflow or xgboost and don’t need to pre-process your data, you can use the pre-built container for prediction, here is another example which I personally enjoy because “codelabs” is a straightforward book to learn.

Enough of this, let’s create they file with handlers.

%%writefile custom_train_job/prediction/main.pyfrom fastapi import Request, FastAPI
import tensorflow as tf
import json
import os
import preprocess
import sys
app = FastAPI()model_uri=os.environ['AIP_STORAGE_URI']
print(f'[INFO] ------ {model_uri}', file=sys.stderr)
model = tf.keras.models.load_model(f'{model_uri}/mpg/model')
@app.get('/')
def get_root():
return {'message': 'Welcome mpg API: miles per gallon prediction'}
@app.get('/health_check')
def health():
return 200
if os.environ.get('AIP_PREDICT_ROUTE') is not None:
method = os.environ['AIP_PREDICT_ROUTE']
else:
method = '/predict'
@app.post(method)
async def predict(request: Request):
print("----------------- PREDICTING -----------------")
body = await request.json()
instances = body["instances"]
norm_data = preprocess.pred_data_process(instances)
outputs = model.predict(norm_data)
response = outputs.tolist()
print("----------------- OUTPUTS -----------------")
return {"predictions": response}

Docker File for Serving

Next step is to create the dockerfile which encapsulates the code mentioned above.

%%writefile custom_train_job/prediction/DockerfileFROM tiangolo/uvicorn-gunicorn-fastapi:python3.7COPY / /app
WORKDIR /app
RUN python -m pip install --upgrade pip
RUN pip install pandas gcsfs tensorflow
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8080"]
EXPOSE 8080

Tag it and Build it!

Thank god we have Google Cloud Build.

!gcloud builds submit -t $PREDICT_IMAGE_URI custom_train_job/prediction/.

Time to Create the Pipeline

You made it!, the files and containers required for your pipeline have been built. Now let’s create the pipeline (if you need to know what a pipeline is, we can always get back to my previous post).

Create Pipeline

The first step is to specify the size of the worker node for the training and the container image previosly created.

## Training Worker Specsworker_pool_specs = [
{
"machine_spec": {
"machine_type": "n1-standard-4"
},
"replica_count": "1",
"container_spec": {
"image_uri": TRAIN_IMAGE_URI,
"env": [
{
"name": "AIP_STORAGE_URI",
"value": AIP_STORAGE_URI
},
]
}
}
]

The second step is to create the pipeline which has the following steps:

  1. CustomTrainingJobOp, it’s a vertex operation module that was meant to be for training in a very fashion way (Amazing!).
  2. import_unmanaged_model_op, Because we’re storing the model and stats on Google Cloud Storage we have to use a special kfp (kubeflow protocol) called importer, which will declare the artifacts to be imported into Vertex World.
  3. custom_model_upload_job, once artifaces are declared on vertex, we upload them into Vertex Model Registry, which is a repository for Models that handles versioning (awesome isn’t it?). From here you could do batch prediction for almost nothing (cents for 1 nodes x 2 hours).
  4. endpoint_create_job, since I built this doc, I wanted to do online predictions, if you only need batch you can stop reading now. If you do need online predictions, follow the next steps to create an endpoint to handle requests behind model versions.
  5. custom_model_deploy_job, and the final secret souce is to deploy the model in the endpoint so it’s publicly accessible.
from kfp.v2.dsl import pipeline
from kfp.v2.components import importer_node
from google_cloud_pipeline_components import aiplatform as gcc
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp
@pipeline(name='custom-train')
def pipeline(
project_id: str
):
train_job = CustomTrainingJobOp(
project=project_id,
display_name='custom_train',
worker_pool_specs=worker_pool_specs
)
import_unmanaged_model_op = importer_node.importer(
artifact_uri=AIP_STORAGE_URI,
artifact_class=artifact_types.UnmanagedContainerModel,
metadata={
"containerSpec": {
"imageUri": PREDICT_IMAGE_URI,
"env": [
{
"name": "PROJECT_ID",
"value": PROJECT_ID},
],
"predictRoute": "/predict",
"healthRoute": "/health_check",
"ports": [
{
"containerPort": 8080
}
]
},
},
).after(train_job)
custom_model_upload_job = gcc.ModelUploadOp(
project=PROJECT_ID,
display_name="customjob-model",
unmanaged_container_model=import_unmanaged_model_op.outputs["artifact"],
).after(import_unmanaged_model_op)
endpoint_create_job = gcc.EndpointCreateOp(
project=PROJECT_ID,
display_name="cutomjob-endpoint",
)

custom_model_deploy_job = (gcc.ModelDeployOp(
model=custom_model_upload_job.outputs["model"],
endpoint=endpoint_create_job.outputs["endpoint"],
deployed_model_display_name="cutomjob-deploy",
traffic_split={"0":"100"},
dedicated_resources_machine_type="n1-standard-2",
dedicated_resources_min_replica_count=1,
dedicated_resources_max_replica_count=1
)).set_caching_options(False)

Compile the Pipeline

from kfp.v2 import compiler
import warnings
warnings.filterwarnings('ignore')
compiler.Compiler().compile(pipeline_func=pipeline,
package_path='custom_train.json')

Submit the Job

import google.cloud.aiplatform as aipjob = aip.PipelineJob(
display_name="custom_train",
template_path="custom_train.json",
pipeline_root=PIPELINE_ROOT_PATH,
parameter_values={
"project_id": PROJECT_ID
},
)
job.submit()

And that’s it! until now you should have something like this:

Use the predict endpoint to test it out.

Go to google console > left hamburger menu > Vertex AI > Endpoints > Choose your endpoint > click your Model

In the JSON request field, type a list of numbers like this:

{“instances”: [8.0, 350.0, 165.0, 3693.0, 11.5, 70.0, 1.0]}

Grab a napkin, you just got served.

Notebook github repo

--

--