Train and Deploy Google Cloud’s Two Towers Recommender

Rubens Zimbres
10 min readAug 5, 2022

--

In the business environment, many times you need to make suggestions to your clients, based on the information you may have about their preferences and tastes. Sometimes you have feedbacks from them, but sometimes it is a new user with no history records. In this case, we are dealing with the cold start problem, like when an user visits your website for the first time.

In this article I will approach recommendation algorithms when you already have a history of interaction or data about your customer. Considering this approach, there are basically two types of recommendation algorithms:

Content-based Recommendation: in this type, you will recommend products/services to a customer based in his/her previous purchases. There are two ways to find out user preferences. The first is explicit feedback, like thumbs up for a Netflix movie, or a star for a product at Amazon. The second, implicit feedback, is feedback indirectly collected through users’ behavior, like pages visited, time spent on pages, number of clicks, amount of time spent watching an ad, etc. Implicit feedback is much more readily available.

So, if an user watched movies of spies, like Salt and Spy Game, you may suggest him/her to watch Jason Bourne trilogy, given that his/her preferences may include adventure, intelligence and guns. On the other side, if he/she watched and liked romantic movies, like The Lake House, Titanic and Purple Hearts, you can recommend movies like Casablanca and Shakespeare in Love.

Collaborative Filtering: this type of recommender is used when you have different customers (A and B) with a similar profile and you want to suggest user B something that user A liked. Given that the users are similar, it’s reasonable to suppose that if user A (an university teacher) likes technical books, user B (an university teacher) also likes.

Knowledge-based Recommendation: this recommender is based on explicit knowledge about the users, items and their recommendation criteria. In this case there is no cold start.

Usually data comes in the form of a large sparse matrix User-Item and one of the simplest techniques to approach the problem is through Matrix Factorization, where you generate two dense matrices from one sparse matrix.

The factorization splits the sparse matrix into rows and columns factors that are essentially the user embeddings.

Given that your sparse matrix is 4(rows) x 5(columns) you will have two matrices, 4 x 2 and 2 x 5. We know that n,m times m,p will give us a n,p matrix, that is, a 4 x 5 matrix. This way, we want to minimize the squared discrepancy between A and U.VT , and we may apply algorithms like Stochastic Gradient Descent (parallel, handles unobserved interaction pairs, but is slower), Alternating Least Squares (ALS, parallel, faster and handles unobserved interaction pairs) and Weighted ALS (WALS).

Given that movies have their characteristics, like names of actors, place, type of movie, gender and age approached, year of launch, if we generate embeddings of these characteristics we may find similarities between movies in the multidimensional space, either by cosine distance, nearest neighbors or any other distance measure. As users also have their characteristics, like age, sex, profession, social status, we can also generate embeddings for them. This way, we can compare embeddings to find similarities between items, users and item-user.

Google has two products that help operationalize recommenders. One is Tensorflow Recommenders, that are built on Keras and are used for retrieval and ranking of candidates. One of them is the Two Towers algorithm. The other is the Matching Engine, that compare embeddings and looks for Top K similarities. The Matching Engine, below, compares the two towers of the recommender embeddings to provide the nearest neighbors.

Source: Google Cloud

Recommender systems are often composed of three stages:

  1. The retrieval stage is responsible for selecting an initial set of hundreds of candidates from all possible candidates.
  2. The ranking stage takes the outputs of the retrieval model and fine-tunes them to select the best possible handful of recommendations, what I called TOP K.
  3. The filtering stage filters the TOP K results so that you only get relevant results.

The Two Towers model has two sub-models: a query model and a candidate model as you will see ahead. So, let’s put it in practice with Python. First, install necessary libraries. If you are using Vertex AI, in Workbench choose Tensorflow 2.8 without GPU image, zone us-central1.

pip install tensorflow==2.9.1
pip install tensorflow-text
pip install scann
pip3 install numpy --ignore-installed
pip install tensorflow-recommenders
pip install -q --upgrade tensorflow-datasets
pip install -q scann

… and load them:

import unidecode
import re
import pandas as pd
import os
import pprint
import tempfile
from typing import Dict, Text
import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds
import tensorflow_recommenders as tfrs
from google.cloud import bigquery
import os
from google.oauth2 import service_account

You don’t need to use BigQuery to run this tutorial, but if you have proper data in BQ, you may use it. To load default tutorial data, run:

ratings = tfds.load("movielens/100k-ratings", split="train")
movies = tfds.load("movielens/100k-movies", split="train")
for x in ratings.take(1).as_numpy_iterator():
pprint.pprint(x)
{'bucketized_user_age': 45.0,
'movie_genres': array([7]),
'movie_id': b'357',
'movie_title': b"One Flew Over the Cuckoo's Nest (1975)",
'raw_user_age': 46.0,
'timestamp': 879024327,
'user_gender': True,
'user_id': b'138',
'user_occupation_label': 4,
'user_occupation_text': b'doctor',
'user_rating': 4.0,
'user_zip_code': b'53211'}
# same for movies:
{'movie_genres': array([4]),
'movie_id': b'1681',
'movie_title': b'You So Crazy (1994)'}

If you have custom data, let’s access BigQuery:

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = '/home/user/Downloads/project-811-777.json'credentials = service_account.Credentials.from_service_account_file('/home/user/Downloads/project-811-777.json')bqclient = bigquery.Client(credentials=credentials)query_string = """SELECT Key.id_vacante,titulo_oe,nivel_estudio,observaciones,codigo_postal,requisito,experiencia FROM  `project-811-777.dataset.table` AS Key LEFT JOIN (SELECT id_vacante,competencia_limpia FROM `project-811-777.dataset.table`) AS Value ON Key.id_vacante=Value.id_vacante"""dataframe = (bqclient.query(query_string).result().to_dataframe(create_bqstorage_client=True,))df=dataframe.dropna()

At this point, if you are using custom data, you may need to use unidecode, regex to clean you data and create a dataset. If you are working with text and descriptions, you may need to pad the strings to the same size to be able to generate a dictionary. Now we create a dictionary of the dataframe to load as a Tensorflow dataset:

my_dict=dict(df)requisitos = tf.data.Dataset.from_tensor_slices(my_dict).map(lambda x: {"id_vacante": x["id_vacante"],"requisito": tf.strings.split(x["requisito"],maxsplit=45)})

Same for the vagas dataset:

vagas = tf.data.Dataset.from_tensor_slices(dict(df)).map(lambda x: {"id_vacante": x["id_vacante"],"titulo_oe": x["titulo_oe"]})

Now we create a random seed and shuffle the dataset, create a batch for the vagas and extract the feature for user_req.

tf.random.set_seed(42)shuffled=requisitos.shuffle(128,seed=42,reshuffle_each_iteration=False)train = shuffled.take(80000)
test = shuffled.skip(80_000).take(20_000)
vagas_titles = vagas.batch(128)user_req = requisitos.map(lambda x: x["requisito"])

At this point, we will need to create vocabularies to be used by the Keras StringLookup, creating a batch for user_req.

unique_vagas_titles = np.unique(np.concatenate(list(vagas_titles)))unique_user_req=np.unique(np.hstack(list(user_req.as_numpy_iterator())))user_req=user_req.batch(128)

Let’s see how StringLookup behaves (note that layer is not part of the code, just a test):

layer = tf.keras.layers.StringLookup(vocabulary=unique_user_req)for x in requisitos.take(1).as_numpy_iterator():
pprint.pprint(np.array(layer(x['requisito'])))
array([273, 262, 341, 85, 26, 7, 191, 34, 7, 190, 90, 151, 235, 96, 7, 342, 342, 342, 342, 342, 342, 342, 342, 342, 342, 342,342, 342, 342, 342, 342, 342, 342, 342, 342, 342, 342, 342, 342,342, 342, 342, 342, 342, 342, 349])

I like to use more embedding dimensions, given that accuracy increases. You may need regularization.

embedding_dimension = 768

Then we define the user model (query tower) and the vagas_model (candidate tower), the Two Towers, adding an additional embedding to account for unknown tokens:

user_model = tf.keras.Sequential([tf.keras.layers.StringLookup(vocabulary=unique_user_req, mask_token=None),
tf.keras.layers.Embedding(len(unique_user_req) + 1, embedding_dimension)])
vagas_model = tf.keras.Sequential([tf.keras.layers.StringLookup(vocabulary=unique_vagas_titles, mask_token=None),
tf.keras.layers.Embedding(len(unique_vagas_titles) + 1, embedding_dimension)])

Note that the model is simple, but this do not prevent you from adding a transformer with attention to it and get semantic embeddings. Now it’s time to define a metric that will give us the affinity score that the model calculates for a pair to the scores of all the other possible candidates.

metrics = tfrs.metrics.FactorizedTopK(
candidates=vagas.batch(128).map(vagas_model))

The loss is defined as a Retrieval task:

task = tfrs.tasks.Retrieval(
metrics=metrics)

The task is a Keras layer that takes the query and candidate embeddings as arguments, and returns the computed loss for backpropagation. As in my case the user_embeddings is multidimensional, we sum the columns with reduce_sum (axis=1).

class MovielensModel(tfrs.Model):
def __init__(self, user_model, vagas_model):
super().__init__()
self.vagas_model: tf.keras.Model = vagas_model
self.user_model: tf.keras.Model = user_model
self.task: tf.keras.layers.Layer = task
def compute_loss(self, features: Dict[Text, tf.Tensor], training=False) -> tf.Tensor:
user_embeddings = self.user_model(features["requisito"])
positive_movie_embeddings = self.vagas_model(features["titulo_oe"])
return self.task(tf.reduce_sum(user_embeddings,axis=1), positive_movie_embeddings)

The model itself is a Keras model:

class NoBaseClassMovielensModel(tf.keras.Model):

def __init__(self, user_model, vagas_model):
super().__init__()
self.vagas_model: tf.keras.Model = vagas_model
self.user_model: tf.keras.Model = user_model
self.task: tf.keras.layers.Layer = task

def train_step(self, features: Dict[Text, tf.Tensor]) -> tf.Tensor:

# Set up a gradient tape to record gradients.
with tf.GradientTape() as tape:

# Loss computation.
user_embeddings = self.user_model(features["requisito"])
positive_movie_embeddings = self.vagas_model(features["titulo_oe"])
loss = self.task(user_embeddings, positive_movie_embeddings)

# Handle regularization losses as well.
regularization_loss = sum(self.losses)

total_loss = loss + regularization_loss

gradients = tape.gradient(total_loss, self.trainable_variables)
self.optimizer.apply_gradients(zip(gradients, self.trainable_variables))

metrics = {metric.name: metric.result() for metric in self.metrics}
metrics["loss"] = loss
metrics["regularization_loss"] = regularization_loss
metrics["total_loss"] = total_loss

return metrics

def test_step(self, features: Dict[Text, tf.Tensor]) -> tf.Tensor:

# Loss computation.
user_embeddings = self.user_model(features["requisito"])
positive_movie_embeddings = self.vagas_model(features["titulo_oe"])
loss = self.task(user_embeddings, positive_movie_embeddings)

# Handle regularization losses as well.
regularization_loss = sum(self.losses)

total_loss = loss + regularization_loss

metrics = {metric.name: metric.result() for metric in self.metrics}
metrics["loss"] = loss
metrics["regularization_loss"] = regularization_loss
metrics["total_loss"] = total_loss

return metrics

Now we instantiate and compile the model:

model = MovielensModel(user_model, vagas_model)
model.compile(optimizer=tf.keras.optimizers.Adagrad(learning_rate=0.1))
cached_train = train.shuffle(100_000).batch(1024).cache()
cached_test = test.batch(1024).cache()
logdir = '/home/user/recommenders'
tensorboard_callback = tf.keras.callbacks.TensorBoard(logdir, histogram_freq=1)

Train the model and evaluate it:

model.fit(cached_train, epochs=10,callbacks=[tensorboard_callback])

You can follow the training in Tensorboard:

model.evaluate(cached_test, return_dict=True)

The whole notebook, so far, can be found in my website, in a Kaggle Notebook:

Two Towers Recommender in Tensorflow/Keras — Training and Deployment

To make predictions, we have two options:

  • Option 1: use tfrs.layers.factorized_top_k.BruteForce, or
  • Option 2: use Vertex AI Matching Engine

They will both compare embeddings searching for similarity, and return Top K candidates. Vertex AI Matching Engine is a long setup and takes at least 3 hours to run and deploy an endpoint for predition, so I will present the tfrs.layers.factorized_top_k.BruteForce here.

We will create a model that takes in raw query features, and recommends vagas out of the entire vagas dataset.

index = tfrs.layers.factorized_top_k.BruteForce(model.user_model)
index.index_from_dataset(
tf.data.Dataset.zip((vagas.batch(100), vagas.batch(100).map(model.vagas_model))))

# Get recommendations.
_, titles = index(tf.constant(["cocina"]))
print(f"Recommendations for user : {titles[0, :3]}")
Recommendations for user : ['COCINA', 'AYUDANTES GENERALES', 'PARRILLEROS']

Then you may save the model for deployment in Vertex AI endpoint:

path = os.path.join("/home/user/recommenders/", "model")
tf.saved_model.save(index, path)

Now let’s deploy the model. At this point, you will have a folder called model containing 2 folders, assets, variables and the saved_model.pb. On Vertex AI run this command, to see the inputs the model is requiring:

!saved_model_cli show --dir /home/jupyter/model --all...
signature_def['serving_default']:
The given SavedModel SignatureDef contains the following input(s):
inputs['input_1'] tensor_info:
dtype: DT_STRING
shape: (-1)
name: serving_default_input_1:0
...

Then, copy this model folder with the saved model to Google Coud Storage.

! gsutil cp -r ./model gs://your-bucket-model

Now, upload the model to AI Platform:

!gcloud ai models upload \
--region=us-west1 \
--project=project-811-777 \
--display-name=model-test-1 \
--container-image-uri=us-docker.pkg.dev/vertex-ai/prediction/tf2-cpu.2-8:latest \
--artifact-uri=gs://your-bucket-model/model

Then, create an endpoint:

!gcloud ai endpoints create \
--project=project-811-777 \
--region=us-west1 \
--display-name=recommender-endpoint-1

Finally deploy the model in this endpoint:

!gcloud ai endpoints deploy-model 1217657523875 \
--project=project-811-777 \
--region=us-west1 \
--model=669638908070799 \
--display-name=recommender-endpoint-1 \
--traffic-split=0=100

Once the model is deployed in the endpoint, we will call the endpoint. For prediction, import necessary libraries:

import tensorflow as tf
import base64
from google.protobuf import json_format
from google.protobuf.struct_pb2 import Value
import numpy as np
import json

Encode the input (notice that images and strings have different base64 encodings at the endpoint):

def encode_64(input):
message = input
message_bytes = message.encode('utf-8')
base64_bytes = base64.b64encode(message_bytes)
base64_message = base64_bytes.decode('utf-8')
return base64_message

Make the inference and get the Top 3 candidates:

instances_list = [{"input_1": {"b64": encode_64("doctor")}}]
instances = [json_format.ParseDict(s, Value()) for s in instances_list]
results = endpoint.predict(instances=instances)print(results.predictions[0]["output_2"][:3])Recommendations for user : ['MEDICO EVALUADOR', 'ASISTENTE', 'LABORATORISTA CLINICO']

That’s it, you have successfully deployed a recommender on GCP!

Now, if you want to delete everything to not incur in unnecessary costs, check your available models and endpoints:

!gcloud ai models list --region=us-west1 --project=project-811-777
!gcloud ai endpoints list --region=us-west1 --project=project-811-777

And delete them:

!(echo Y) | gcloud ai models delete 22973636337 --region us-west1
!(echo Y) | gcloud ai endpoints delete 898687575757 --region us-west1

--

--

Rubens Zimbres

I’m a Senior Data Scientist and Google Developer Expert in ML and GCP. I love studying NLP algos and Cloud Infra. CompTIA Security +. PhD. www.rubenszimbres.phd