Decoding the Matrix — A Journey Into the Algorithms of Recommender Systems — Part 5: Matrix Factorization in PySpark
This one’s more code with more comments and annotations, as we’ve already explored matrix factorization in the previous part!
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
PySpark has a whole ML library, and recommendations are a large part of it. We’ll import alternating least squares (ALS), MatrixFactorization, and the Rating object.
from pyspark import SparkContext
sc = SparkContext("local", "your database here <3")#load in the data
data = sc.textFile('content/drive/MyDrive/movielens-20m-dataset/small_rating.csv')
#filter out the header
header = data.first()
data = data.filter(lambda row: row!= header)
ratings = data.map(
lambda l: l.split(',')
).map(
lambda l: Rating(int(l[0]), int(l[1]), float(l[2]))
)
train, test = ratings.randomSplit([.8,.2])
No session app to connect to, so I ran this locally. I removed the header and also split the dataset into train and test.
Now we’ll start training.
#first step is to define the latent dimensionality
latent_dimensionality = 10
#the number of training epochs
epochs = 10
#and then run alternating least squares
model = ALS.train(train, latent_dimensionality, epochs)
Now it’s time to evaluate both models. We’ll start with the train set.
#we need an x to use as our prediction
#i only want the user and movie ids
x = train.map(lambda p: (p[0], p[1]))
#use x to get predicted ratings
predictions = model.predictAll(x).map(r: ((r[0],r[1]),r[2]))
#have to join on user_id and movie_id,user_id pair (first item)
#the end result should be((user_id ,movie_id),(rating, prediction))
ratings_with_predictions = train.map(r: ((r[0],r[1]),r[2])).join(predictions)
#getting the mse is a piece of cake now
mse = ratings_with_predictions.map(lambda r: (r[1][0] - r[1][1]**2).mean()
print("train_mse:" mse)
#output: train_mse: 0.5214896261853352
Just replace train with test.
#we need an x to use as our prediction
#i only want the user and movie ids
x = test.map(lambda p: (p[0], p[1]))
#use x to get predicted ratings
predictions = model.predictAll(x).map(r: ((r[0],r[1]),r[2]))
#have to join on user_id and movie_id,user_id pair (first item)
#the end result should be((user_id ,movie_id),(rating, prediction))
ratings_with_predictions = test.map(r: ((r[0],r[1]),r[2])).join(predictions)
#getting the mse is a piece of cake now
mse = ratings_with_predictions.map(lambda r: (r[1][0] - r[1][1]**2).mean()
print("test_mse:" mse)
#output: 0.5498697888986561
We get around what we would expect, very close to what we got doing it ourselves in NumPy!