Two Towers Model: A Custom Pipeline in Vertex AI Using Kubeflow

Rubens Zimbres
Google Developer Experts
9 min readMar 2, 2023

MLOps is composed by Continuous Integration (CI — code, unit testing, remerge code), Continuous Delivery (CD — build, test, release) and Continuous Training (CT — train, monitor, measure, retrain, serve). Consider the following situation: you develop a solution where you will offer product search for users. There are new users every minute and new products every day. In this situation we will have an index of embeddings containing all the products, and users query will be submitted as numerical vectors to this index, to check for the best results. This index is deployed in a container inside Vertex AI endpoints.

As new products appear, this index of the word embeddings needs to be updated, retrained and redeployed. That’s when pipelines enter. Pipelines are well defined processes that start with raw data (product description in text). This data is extracted, analyzed, prepared and submitted to a Machine Learning model in different frameworks (PyTorch, Sklearn, Tensorflow).

The model is then trained, validated and evaluated against a test set to check for its generalization properties and accuracy. Then, the model is deployed into an endpoint to serve predictions. During this whole process, we will monitor performance and also look for changes in distribution of data or underlying concept that may affect the prediction. If you retrain and redeploy a model every day, you will have a bigger cost, but a more accurate model. If you retrain and redeploy each month, you will save money, but your model performance will degrade.

So we have two main issues in CI/CD: changes in Python code itself, and the need to retrain your model periodically. Regarding code, as Vertex AI prediction service runs in containers, you may attach your GitHub repository into Google Cloud Build or Google Cloud Source Repositories. When you do that you need to configure a trigger in Cloud Build that will start to update the container when you push altered code into the repo or when you update a tag. This trigger will start a Docker process to generate/update a Container Registry image, that will be available for Vertex AI, by reference.

Now you need to retrain your model periodically. How? By using Pipelines. There are three main ways to develop pipelines: TFX (for Tensorflow models), Kubeflow (PyTorch, Tensorflow, Sklearn, etc) and Cloud Composer (any framework). In this article I will approach Kubeflow.

Here, my intention is not to provide a comprehensive tutorial, but the most important parts of code that will allow you to develop your own pipeline.

Kubeflow has a Python SDK where each step of the pipeline is defined as a @component. Vertex AI works with ‘ops’, like mlengine_train_op, evaluate_model_op, as seen on documentation. Kubeflow has pre-built components, lightweight Python components and custom components. Here I will show custom components in a Jupyter notebook.

First, create a new User-Managed Notebook with Tensorflow 2.9 in Vertex AI and Open JupyterLab.

Install additional libraries and restart the kernel:

! pip3 install --upgrade google-cloud-aiplatform google-cloud-pipeline-components --user -q
! pip install kfp --upgrade --user

Set the project and variables of the notebook:

! gcloud config set project project-id
PROJECT_ID="project-id"
REGION = "us-central1"
BUCKET_URI = "gs://train-pipeline-artifacts" # @param {type:"string"}
PIPELINE_ROOT = "gs://train-pipeline-artifacts/"

You have to take care of two things:

  • allow your user to have the role aiplatform.tensorboardWebAppUser in the IAM (Identity and Access Management) for Tensorboard. Create a custom role in IAM and attach to the user.
  • create a Tensorboard instance in Experiments/Tensorboard Instances. You may use the Console.

Then create a new .ipynb file and import necessary libraries:

import kfp
from typing import NamedTuple
from kfp.v2.dsl import pipeline
from kfp.v2.dsl import component
from kfp.v2.dsl import OutputPath
from kfp.v2.dsl import InputPath
from typing import NamedTuple
from kfp.v2 import dsl
from kfp.v2.dsl import (Artifact,
Dataset,
Input,
Model,
Output,
Metrics,
ClassificationMetrics,
component,
Markdown)
from kfp.v2 import compiler
from kfp.v2.google.client import AIPlatformClient
from kfp.v2.components.executor import Executor
from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
from google_cloud_pipeline_components import aiplatform as gcc_aip
from google_cloud_pipeline_components.v1.model import ModelUploadOp

Our training code reads from a file located inside a specific bucket. So, it’s only necessary to update this file daily and schedule the pipeline to run in order to generate a new training. Each @component starts with the necessary packages to be installed, and a function that defines the Training. The function has parameters that will show in Google Cloud Console (epochs, embed_length and maxsplit_). Also, you have smetrics, that will also show in Google Cloud Console, like training and evaluation performance. The output of this Kubeflow component is Output[Model], that is, the index that we will upload and then deploy in the endpoint.

TRAINING component:

@component(
packages_to_install = [
"pandas==1.3.4",
"numpy==1.20.3",
"unidecode",
"nltk==3.6.5",
"tensorflow-recommenders==0.7.0",
"tensorflow==2.9.1",
"gcsfs==2023.1.0",
"google-cloud-aiplatform==1.22.1",
"tensorboard==2.9.0"

],
)

def train_model(epochs_: int,
embed_length: int,
maxsplit_: int,
smetrics: Output[Metrics],
model_artifact: Output[Model]) -> NamedTuple("Outputs", [("model_artifact", Model)]):

Then you import the necessary libraries, create your dataframe and do feature engineering. Latest pandas version is able to read directly from GCP bucket. The path variable is where the model artifacts will be saved, like weights, checkpoints, etc. Notice that there are two Keras callbacks, one for the model checkpoint and the other for Tensorboard. Then we train the model, what is trivial for Keras. The code for the Two Towers model can be found here. Then we grab the scores for train and test set with smetrics. These scores will show up in Google Cloud Console, in the details of the experiment, as we will will see ahead.

import datetime
import numpy as np
import tensorflow as tf
import tensorflow_recommenders as tfrs
###.... and others

now=datetime.datetime.now().strftime('%Y%m%d%H%M%S')

df=pd.read_csv('gs://bucket-train/training_data.csv',sep=',',header=0)


###.....Analyze data, prepare features, training and test set


path = os.path.join(model_artifact.path, "/model-{}/".format(now))

cp_callback = tf.keras.callbacks.ModelCheckpoint(
filepath=path,
verbose=1,
save_weights_only=True,
save_freq=5)

tensorboard_callback = tf.keras.callbacks.TensorBoard(
log_dir="gs://"+str(model_artifact.path)[5:]+"/tensorboard",
histogram_freq=0,
write_graph=True)

history=model.fit(cached_train, callbacks=[cp_callback,tensorboard_callback],
epochs=epochs_)
smetrics.log_metric("Train score TOP 1", float(history.history['factorized_top_k/top_1_categorical_accuracy'][-1]))

score=model.evaluate(cached_test, return_dict=True)
smetrics.log_metric("Evaluation score TOP 1", float(score['factorized_top_k/top_1_categorical_accuracy']))

After that we generate our index for deployment, and save it. The model_artifact.path will contain two folders, Variables and Assets, and also a Tensorflow .pb file.

index = tfrs.layers.factorized_top_k.BruteForce(model.user_model)
index.index_from_dataset(tf.data.Dataset.zip((movies.batch(100), movies.batch(100).map(model.movie_model))))

path1=model_artifact.path

# Save the index.
tf.saved_model.save(index, path1)

In the same @component, we will upload the Tensorboard history file (events) via tb-gcp-uploader.

TENSORBOARD_NAME = "tensorboard-train"  # @param {type:"string"}
EXPERIMENT_NAME="train"

import os
os.system("tb-gcp-uploader --tensorboard_resource_name projects/project-id/locations/us-central1/tensorboards/tensorboard-number \
--logdir={} --experiment_name={} --one_shot=True".format(

"gs://"+str(model_artifact.path)[5:]+"/tensorboard",
"train-pipeline-with-deployment-{}".format(now)))

Now that the first component is ready, let’s take care of Deployment.

DEPLOYMENT component:

@component(packages_to_install=["google-cloud-aiplatform==1.22.1",
"tensorboard==2.9.0",
"tensorflow==2.9.1"])

def deploy_model(
model: Input[Model],
model_artifact: Output[Model],
project: str,
region: str,
):

from google.cloud import aiplatform
aiplatform.init(project=project, location=region,
staging_bucket="gs://train-staging-bucket")

import logging
import os

path = model.uri

import datetime
now=datetime.datetime.now().strftime('%Y%m%d%H%M%S')

import os
logging.basicConfig(level=logging.DEBUG)
logging.debug(model)

print(model)
print("***********", model.uri,"*************") #### TO HELP US IN LOGS

Regarding this component, I need to thank Sascha Heyer for the following code that helped me very much in the deployment part. His notebook and article are available here.

The code below (continuation of the deployment component above) uploads our Tensorflow model index to Vertex AI via Python code, defines the instance that will run it, creates an endpoint and then will deploy this model in this endpoint. Note that we add some print() statements, so that we will be able to check the pipeline logs for any problems, as well to see the paths used.

    container="us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-9:latest"

DISPLAY_NAME = "train"

MODEL_TYPE = "query"
MODEL_NAME = f"{MODEL_TYPE}_model" # Used by the deployment container.

models = aiplatform.Model.list(filter=("display_name={}").format(DISPLAY_NAME))

if len(models) == 0:
model_uploaded = aiplatform.Model.upload(
display_name = DISPLAY_NAME,
artifact_uri=model.uri,
serving_container_image_uri = container,
)
model_uploaded.wait()
print(model_uploaded.display_name)
print(model_uploaded.resource_name)

else:
parent_model = models[0].resource_name

model_uploaded = aiplatform.Model.upload(
parent_model = parent_model,
display_name = DISPLAY_NAME,
artifact_uri=model.uri,
serving_container_image_uri = container,
)
model_uploaded.wait()
print(model_uploaded.display_name)
print(model_uploaded.resource_name)

endpoint_display_name = f"train-endpoint"
endpoint = aiplatform.Endpoint.create(display_name=endpoint_display_name)

traffic_percentage = 100
machine_type = "n1-standard-8"
sync = True

endpoint = model_uploaded.deploy(
endpoint=endpoint,
deployed_model_display_name=DISPLAY_NAME,
machine_type=machine_type,
traffic_percentage=traffic_percentage,
sync=sync,
)

for model in endpoint.list_models():
print(model)
if model.id not in endpoint.traffic_split:
endpoint.undeploy(deployed_model_id = model.id)

Now we develop the component PIPELINE:

@dsl.pipeline(
pipeline_root=PIPELINE_ROOT + "train-pipeline",
name="train-pipeline-with-deployment",
)


def pipeline(epochs_:int,
embed_length:int,
maxsplit_:int):

training_op = train_model(epochs_,
embed_length,
maxsplit_).set_cpu_limit('16').set_memory_limit('32G').set_caching_options(False)

deploy_op = deploy_model(training_op.outputs["model_artifact"] ,"project-id","us-central1").set_cpu_limit('8').set_memory_limit('16G').set_caching_options(False)

Now, COMPILE the pipeline into a JSON file:

compiler.Compiler().compile(
pipeline_func=pipeline,
package_path='train_pipeline.json')

Finally, RUN the job, specifying the training parameters:

job = pipeline_jobs.PipelineJob(
display_name="train_pipeline",
template_path="train_pipeline.json",
parameter_values={
'epochs_': 95,
'embed_length':768,
'maxsplit_' : 140
}
)

job.submit(experiment="train")

To schedule the job to run periodically, you must use the AI Platform Client:

from kfp.v2.google.client import AIPlatformClient
api_client = AIPlatformClient(project_id=PROJECT_ID, region='us-central1')

api_client.create_schedule_from_job_spec(
job_spec_path='train_pipeline.json',
schedule='0 1 * * *' ### every day == CRON JOB
)

After you run this last block of code, you will follow the process in the Google Cloud Console interface. The first thing that shows up is the new pipeline — Running. Go to Vertex AI/Pipelines.

If you click the pipeline run, you will see the pipeline graph with the training part running.

If you click the bottom of the page, you will see the logs for that component, and at the right, the input parameters.

You can also see the logs in Google Cloud Logging, with this filter:

resource.type="ml_job"
resource.labels.job_id="123456789"
severity=ERROR

After this component finished its job, the pipeline will flow to the next component, Deployment. The model artifacts were saved, as well as the index, and metrics were recorded:

At this point, the logs print() gave you the Tensorboard link, but you can also wait for the job to finish to see the training performance in Tensorboard, at Experiments’ link:

If you go to Experiments, you will see the performance for each Pipeline Run:

When the job finishes, you will have an active endpoint with the sample request and also a new model in Model Registry.

If everything went well, you can now prepare data and call the endpoint to make inference.

from google.cloud import aiplatform

os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="/home/jupyter/.config/gcloud/application_default_credentials.json"

aip_endpoint_name = (
f"projects/project-number/locations/us-central1/endpoints/123456789"
)

endpoint = aiplatform.Endpoint(aip_endpoint_name)

instances_list = [{"input_1": ["disenador grafico con experiencia en montaje de planchas".split()]}]

Notice that “input_1” corresponds exactly to the signature of the model saved, that can be obtained locally this way:

!saved_model_cli show - dir /home/jupyter/treino_/model/ - all

And it’s something like this:

Then we run the inference. As we are using the Two Towers Model:

TOP_N=10
results = endpoint.predict(instances=instances_list)
df[df.code.map(lambda x: x.decode("utf-8")).isin(neighbors[0][0])].Vacancy_Name

Note that this whole solution is very similar to Vertex AI Matching Engine, as we are deploying a ScaNN (Scalable Nearest Neighbors) index in Vertex AI. This solution is useful when you have less than 10 million endpoint calls during a month. The inference time is about 500 miliseconds. If you need a solution that scales to millions of qps (queries per second), use Vertex AI Matchine Engine. The only thing that changes is the deployment mechanism of the index.

If you are just testing, it’s a safe procedure to undeploy the model, delete the model, delete the endpoint and clean the bucket to save costs.

!gsutil rm -r gs://train-pipeline-artifacts/*

To undeploy the model you need both the model number and the endpoint number:

!gcloud ai models list --region=us-central1 --project=project-id
!(echo Y) | gcloud ai endpoints list --region=us-central1 --project=project-id

Undeploy the model:

from google.cloud import aiplatform
from google.cloud import aiplatform_v1

def undeploy_model_in_endpoint(
end_point: str,
project: str,
model_id: str,
location: str = "us-central1",
api_endpoint: str = "us-central1-aiplatform.googleapis.com",
timeout: int = 7200,
):
# The AI Platform services require regional API endpoints.

client_options = {"api_endpoint": api_endpoint}

# Initialize client that will be used to create and send requests.
# This client only needs to be created once, and can be reused for multiple requests.

client = aiplatform.gapic.EndpointServiceClient(client_options=client_options)
client_model = aiplatform_v1.services.model_service.ModelServiceClient(client_options=client_options)

# Get deployed_model_id

model_name = f'projects/{project}/locations/{location}/models/{model_id}'
model_request = aiplatform_v1.types.GetModelRequest(name=model_name)
model_info = client_model.get_model(request=model_request)
deployed_models_info = model_info.deployed_models
deployed_model_id=model_info.deployed_models[0].deployed_model_id

name=f'projects/{project}/locations/{location}/endpoints/{end_point}'

undeploy_request = aiplatform_v1.types.UndeployModelRequest(endpoint=name,deployed_model_id=deployed_model_id)
client.undeploy_model(request=undeploy_request)

undeploy_model_in_endpoint(end_point='123456789',project='project-id',model_id='123456789')

Delete the model and the endpoint:

!(echo Y) | gcloud ai models delete 123456789 --region us-central1
!(echo Y) | gcloud ai endpoints delete 123456789 --region us-central1

--

--

Rubens Zimbres
Google Developer Experts

I’m a Senior Data Scientist and Google Developer Expert in ML and GCP. I love studying NLP algos and Cloud Infra. CompTIA Security +. PhD. www.rubenszimbres.phd