Member preview

Scala Machine Learning Projects: Recommendation Systems

When you talk about a movie recommendation system, you can’t help but think about Netflix. Netflix is an American entertainment company that uses a model-based collaborative filtering approach for real-time movie recommendation for its subscribers.

This article delves into a model-based movie recommendation engine with Spark that recommends movies for new users. You will see how to interoperate between ALS and matrix factorisation (MF) for the movie recommendation engine and use the movie lens dataset for the project.

Model-based recommendation with Spark

To make a preference prediction for any user, collaborative filtering uses a preference by other users of similar interests and predicts movies of your interests that are unknown to you. Spark MLlib uses Alternate Least Squares (ALS) to make a recommendation. Here is a glimpse of a collaborative filtering method used in the ALS algorithm:

Table 1 — User-movie matrix

In the preceding table, user ratings on movies are represented as a user-item matrix, where a cell represents ratings for a particular movie by a user. The cell with ? represents the movie user U4 is not aware of or hasn’t seen. Based on the current preference of U4, the cell with ? can be filled in with an approximate rating of users who have similar interests as U4. So, at this point, ALS cannot do it alone, but the LFs are then used to predict the missing entries.

The Spark API provides the implementation of the ALS algorithm, which is used to learn these LFs based on the following six parameters:

  • numBlocks: This is the number of blocks used to parallelise computation (set to -1 to auto-configure).
  • rank: This is the number of LFs in the model.
  • iterations: This is the number of iterations of ALS to run. ALS typically converges to a reasonable solution in 20 iterations or less.
  • lambda: This specifies the regularisation parameter in ALS.
  • implicitPrefs: This specifies whether to use the explicit feedback from the ALS variant (or one user defined) for implicit feedback data.
  • alpha: This is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations.

Note that to construct an ALS instance with default parameters you can set the value based on your requirements. The default values are as follows: numBlocks: -1, rank: 10, iterations: 10, lambda: 0.01, implicitPrefs: false, and alpha: 1.0.


Data exploration

The movie and the corresponding rating dataset were downloaded from the MovieLens website (https://movielens.org). According to the data description on the MovieLens website, all the ratings are described in the ratings.csv file. Each row of this file, followed by the header, represents one rating of one movie by one user.

The CSV dataset has the following columns: userId, movieId, rating, and timestamp. The rows are ordered first by userId and within the user by movieId. Ratings are made on a five-star scale, with half-star increments (0.5 stars up to a total of 5.0 stars). The timestamps represent the seconds since midnight in Coordinated Universal Time (UTC) on January 1, 1970. You have 105,339 ratings from 668 users on 10,325 movies:

On the other hand, movie information is contained in the movies.csv file. Each row, apart from the header information, represents one movie containing these columns: movieId, title, and genres. Movie titles are either created or inserted manually or imported from the website of the movie database at https://www.themoviedb.org/. The release year, however, is shown in brackets.

Since movie titles are inserted manually, some errors or inconsistencies may exist in these titles. Readers are, therefore, recommended to check the IMDb database (https://www.imdb.com/) to make sure that there are no inconsistencies or incorrect titles with the corresponding release year:

Genres are in a separated list and are selected from the following genre categories:

  • Action, Adventure, Animation, Children’s, Comedy, and Crime
  • Documentary, Drama, Fantasy, Film-Noir, Horror, and Musical
  • Mystery, Romance, Sci-Fi, Thriller, Western, and War

Movie recommendation using ALS

In this subsection, you will see how to recommend movies to other users through a systematic example, from data collection to movie recommendation.

Step 1 — Import packages, load, parse, and explore the movie and rating dataset

You will load, parse, and do some exploratory analysis. However, before that, let’s import the necessary packages and libraries:

package com.packt.ScalaML.MovieRecommendation

import org.apache.spark.sql.SparkSession

import org.apache.spark.mllib.recommendation.ALS

import org.apache.spark.mllib.recommendation.MatrixFactorizationModel

import org.apache.spark.mllib.recommendation.Rating

import scala.Tuple2

import org.apache.spark.rdd.RDD

This code segment should return you the DataFrame of the ratings:

val ratigsFile = "data/ratings.csv"

val df1 = spark.read.format("com.databricks.spark.csv").option("header", true).load(ratigsFile)

val ratingsDF = df1.select(df1.col("userId"), df1.col("movieId"), df1.col("rating"), df1.col("timestamp"))ratingsDF.show(false)

The following code segment shows you the DataFrame of the movies:

val moviesFile = "data/movies.csv"

val df2 = spark.read.format("com.databricks.spark.csv").option("header", "true").load(moviesFile)

val moviesDF = df2.select(df2.col("movieId"), df2.col("title"), df2.col("genres"))

Step 2 — Register both DataFrames as temp tables to make querying easier

To register both datasets, you can use the following code:

ratingsDF.createOrReplaceTempView("ratings")

moviesDF.createOrReplaceTempView("movies")

This will help to make in-memory querying faster by creating a temporary view as a table in the memory. The lifetime of the temporary table using the createOrReplaceTempView () method is tied to [[SparkSession]], which was used to create this DataFrame.

Step 3 — Explore and query for related statistics

Let’s check the ratings-related statistics. Just use the following code lines:

val numRatings = ratingsDF.count()

val numUsers = ratingsDF.select(ratingsDF.col("userId")).distinct().count()

val numMovies = ratingsDF.select(ratingsDF.col("movieId")).distinct().count() println("Got " + numRatings + " ratings from " + numUsers + " users on " + numMovies + " movies.")

>>>

Got 105339 ratings from 668 users on 10325 movies.

You should find 105,339 ratings from 668 users on 10,325 movies. Now, let’s get the maximum and minimum ratings along with the count of users who have rated a movie. However, you need to perform an SQL query on the rating table you just created in memory in the previous step. Making a query here is simple, and it is similar to making a query from a MySQL database or RDBMS.

However, if you are not familiar with SQL-based queries, you are advised to look at the SQL query specification to find out how to perform a selection using SELECT from a particular table, how to perform ordering using ORDER, and how to perform a joining operation using the JOIN keyword.

Well, if you know the SQL query, you should get a new dataset using a complex SQL query, as follows:

// Get the max, min ratings along with the count of users who have rated a movie.

val results = spark.sql("select movies.title, movierates.maxr, movierates.minr, movierates.cntu "

+ "from(SELECT ratings.movieId,max(ratings.rating) as maxr,"

+ "min(ratings.rating) as minr,count(distinct userId) as cntu "

+ "FROM ratings group by ratings.movieId) movierates "

+ "join movies on movierates.movieId=movies.movieId "

+ "order by movierates.cntu desc") results.show(false)

Output:

To get some insight, you need to know more about the users and their ratings. Now let’s find the 10 most active users and how many times they rated a movie:

val mostActiveUsersSchemaRDD = spark.sql("SELECT ratings.userId, count(*) as ct from ratings "+ "group by ratings.userId order by ct desc limit 10")mostActiveUsersSchemaRDD.show(false)

>>>

Figure: Top 10 active users and how many times they rated a movie

Let’s have a look at a particular user and find the movies that, say user, 668 rated higher than 4:

val results2 = spark.sql(

"SELECT ratings.userId, ratings.movieId,"

+ "ratings.rating, movies.title FROM ratings JOIN movies"

+ "ON movies.movieId=ratings.movieId"

+ "where ratings.userId=668 and ratings.rating > 4") results2.show(false)

>>>

Step 4 — Prepare training and test rating data and check the counts

The following code splits the ratings RDD into training data RDD (75%) and test data RDD (25%). Seed here is optional but is required for reproducibility purposes:

// Split ratings RDD into training RDD (75%) & test RDD (25%)

val splits = ratingsDF.randomSplit(Array(0.75, 0.25), seed = 12345L)

val (trainingData, testData) = (splits(0), splits(1))

val numTraining = trainingData.count()

val numTest = testData.count()

println("Training: " + numTraining + " test: " + numTest)

You should notice that there are 78,792 ratings in training and 26,547 ratings in the test DataFrame.

Step 5 — Prepare the data for building the recommendation model using ALS

The ALS algorithm takes the RDD of ratings for training. To do so, the following code illustrates for:

val ratingsRDD = trainingData.rdd.map(row => {

val userId = row.getString(0)

val movieId = row.getString(1)

val ratings = row.getString(2)

Rating(userId.toInt, movieId.toInt, ratings.toDouble)

})

The ratingsRDD is an RDD of ratings that contains userId, movieId, and the corresponding ratings from the training dataset you prepared in the previous step. On the other hand, a test RDD is also required for evaluating the model. The following test RDD also contains the same information coming from the test DataFrame you prepared in the previous step:

val testRDD = testData.rdd.map(row => {

val userId = row.getString(0)

val movieId = row.getString(1)

val ratings = row.getString(2)

Rating(userId.toInt, movieId.toInt, ratings.toDouble)

})

Step 6 — Build an ALS user product matrix

Build an ALS user matrix model based on ratingsRDD by specifying the maximal iteration, a number of blocks, alpha, rank, lambda, seed, and implicitPrefs. Essentially, this technique predicts missing ratings for specific users and specific movies based on ratings for those movies from other users who gave similar ratings for other movies:

val rank = 20

val numIterations = 15

val lambda = 0.10

val alpha = 1.00 val block = -1

val seed = 12345L

val implicitPrefs = false

val model = new ALS().setIterations(numIterations) .setBlocks(block).setAlpha(alpha)

.setLambda(lambda)

.setRank(rank) .setSeed(seed)

.setImplicitPrefs(implicitPrefs)

.run(ratingsRDD)

Finally, you iterated the model for learning 15 times. With this setting, you have gotten good prediction accuracy. Readers are advised to apply hyperparameter tuning to get to know the most optimum values for these parameters. Furthermore, set the number of blocks for both user blocks and product blocks to parallelize the computation into a pass -1 for an auto-configured number of blocks. The value is -1.

Step 7 — Making predictions

Let’s get the top six movie predictions for user 668. The following source code can be used to make the predictions:

// Making Predictions. Get the top 6 movie predictions for user 668

println("Rating:(UserID, MovieID, Rating)")

println("----------------------------------")

val topRecsForUser = model.recommendProducts(668, 6) for (rating <- topRecsForUser) { println(rating.toString()) } println("----------------------------------") >>>

Step 8 — Evaluating the model

In order to verify the quality of the model, Root Mean Squared Error (RMSE) is used to measure the difference between values predicted by a model and the values actually observed. By default, the smaller the calculated error, the better the model. In order to test the quality of the model, the test data is used (which was split in step 4).

According to many machine learning practitioners, RMSE is a good measure of accuracy, but only for comparing forecasting errors of different models for a particular variable. They say it is not fit for comparing between variables as it is scale dependent. The following line of code calculates the RMSE value for the model that was trained using the training set:

val rmseTest = computeRmse(model, testRDD, true)

println("Test RMSE: = " + rmseTest) //Less is better

For this setting, you will get this output:

Test RMSE: = 0.9019872589764073

This method computes the RMSE to evaluate the model, the lesser the RMSE, the better the model and its prediction capability. It is to be noted that computeRmse() is a UDF that goes as follows:

def computeRmse(model: MatrixFactorizationModel, data: RDD[Rating], implicitPrefs: Boolean): Double = { val predictions: RDD[Rating] = model.predict(data.map(x => (x.user, x.product))) valpredictionsAndRatings = predictions.map { x => ((x.user, x.product), x.rating) } .join(data.map(x => ((x.user, x.product), x.rating))).values if (implicitPrefs) { println("(Prediction, Rating)") println(predictionsAndRatings.take(5).mkString("n")) } math.sqrt(predictionsAndRatings.map(x => (x._1 - x._2) * (x._1 - x._2)).mean()) }

>>>

Finally, let’s provide some movie recommendation for a specific user. Let’s get the top six movie predictions for user 668:

println("Recommendations: (MovieId => Rating)") println("----------------------------------") val recommendationsUser = model.recommendProducts(668, 6) recommendationsUser.map(rating => (rating.product, rating.rating)).foreach(println) println("----------------------------------")

>>>

The performance of the preceding model could be increased more. However, so far, there’s no model tuning facility of our knowledge available for the MLlib-based ALS algorithm.

If you are interested to learn more about machine learning concepts, check out Md. Rezaul Karim’s Scala Machine Learning Projects for developing powerful smart applications using deep learning algorithms to dominate numerical computing, deep learning, and functional programming.


For more updates you can follow me on Twitter on my twitter handle @NavRudraSambyal

Thanks for reading, please share it if you found it useful