Decoding the Matrix — A Journey Into the Algorithms of Recommender Systems — Part 5: Matrix Factorization in PySpark

Abelardo Riojas
2 min readJun 23, 2023

--

Neo with his cluster of machines needed to work with big data.

This one’s more code with more comments and annotations, as we’ve already explored matrix factorization in the previous part!

from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

PySpark has a whole ML library, and recommendations are a large part of it. We’ll import alternating least squares (ALS), MatrixFactorization, and the Rating object.

from pyspark import SparkContext

sc = SparkContext("local", "your database here <3")#load in the data
data = sc.textFile('content/drive/MyDrive/movielens-20m-dataset/small_rating.csv')

#filter out the header

header = data.first()
data = data.filter(lambda row: row!= header)

ratings = data.map(
lambda l: l.split(',')
).map(
lambda l: Rating(int(l[0]), int(l[1]), float(l[2]))
)

train, test = ratings.randomSplit([.8,.2])

No session app to connect to, so I ran this locally. I removed the header and also split the dataset into train and test.

Now we’ll start training.

#first step is to define the latent dimensionality
latent_dimensionality = 10

#the number of training epochs
epochs = 10

#and then run alternating least squares
model = ALS.train(train, latent_dimensionality, epochs)

Now it’s time to evaluate both models. We’ll start with the train set.

#we need an x to use as our prediction

#i only want the user and movie ids
x = train.map(lambda p: (p[0], p[1]))

#use x to get predicted ratings
predictions = model.predictAll(x).map(r: ((r[0],r[1]),r[2]))

#have to join on user_id and movie_id,user_id pair (first item)
#the end result should be((user_id ,movie_id),(rating, prediction))
ratings_with_predictions = train.map(r: ((r[0],r[1]),r[2])).join(predictions)


#getting the mse is a piece of cake now
mse = ratings_with_predictions.map(lambda r: (r[1][0] - r[1][1]**2).mean()

print("train_mse:" mse)
#output: train_mse: 0.5214896261853352

Just replace train with test.

#we need an x to use as our prediction

#i only want the user and movie ids
x = test.map(lambda p: (p[0], p[1]))

#use x to get predicted ratings
predictions = model.predictAll(x).map(r: ((r[0],r[1]),r[2]))

#have to join on user_id and movie_id,user_id pair (first item)
#the end result should be((user_id ,movie_id),(rating, prediction))
ratings_with_predictions = test.map(r: ((r[0],r[1]),r[2])).join(predictions)

#getting the mse is a piece of cake now
mse = ratings_with_predictions.map(lambda r: (r[1][0] - r[1][1]**2).mean()

print("test_mse:" mse)
#output: 0.5498697888986561

We get around what we would expect, very close to what we got doing it ourselves in NumPy!

--

--

Abelardo Riojas

Musical-data addict and Masters of Data Science candidate at The University of Texas Austin!