Building a simple recommender system with Spark and MemSQL

--

Github repo

This story is organized into two parts. The first one is about building a movie recommender system based on collaborative filtering using Apache Spark. The recommendation model we’ll build is almost identical to the example model from the Apache Spark ALS model, so we won’t go into too much depth here.

The second is about serving the results recommendation model. Instead of pre-calculating the recommendations for each user or creating a service that can make predictions given a user we’ll store everything in a MemSQL relational database and will use some build-in ML methods to make user prediction in SQL. Since we’ll use SQL to make a prediction; It’s very straightforward to join your prediction with other data to enrich or filter the results.

Prerequisites

You’ll need Apache spark, Docker (to run a MemSQL database), and Python in order to follow. If you don’t have these installed right now, not a problem, you can just read along.

Let’s dive in!

Getting the data

First, let’s get some data. Like in many other stories we’ll use the 1M rating dataset by the Movielens Group (Released in 2003). This dataset contains movie the five-star ratings of about 6000 users and 4000 different movies.

curl http://files.grouplens.org/datasets/movielens/ml-1m.zip --output ml-1m.zip unzip -o ml-1m.zip

We’ll use pandas to load the Movies and users data in a Jupyter Notebook.

import pandas as pd

movies = pd.read_csv('ml-1m/movies.dat', sep="::", header=None, names=['MovieID','Title','Genres'], engine='python')
users = pd.read_csv('ml-1m/users.dat', sep="::", header=None, names=['UserID','Gender','Age','Occupation','Zip-code'], engine='python')
# Extracting the year of release from the title
movies['year'] = movies['Title'].str.extract('\(([0-9]{4})\)', expand=False).str.strip()

We'll load the ratings in a spark dataframe.

from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark import SparkConf, SparkContext, sql

spark = SparkSession.builder.appName('ops').getOrCreate()

lines = spark.read.text('ml-1m/ratings.dat').rdd
parts = lines.map(lambda row: row.value.split('::'))

ratingsRDD = parts.map(lambda p: Row(userID=int(p[0]),movieID=int(p[1]),rating=int(p[2])))
ratings = spark.createDataFrame(ratingsRDD)

(training, test) = ratings.randomSplit([0.8, 0.2])

With the data loaded, we can start building an item (movie) to user recommendation model.

There are two mayor recommendation system paradigms; collaborative and content-based methods (or a combination of both, hybrids). Collaborative methods are based on the past interactions recorded between users and items in order to produce new recommendations, while content-based methods are based on the attributes of the users and/or items. A great introduction to recommendation systems can be found here. Is this story we’ll consider a collaborative method.

Collaborative filtering / Matrix factorization

Collaborative filtering is based on the assumption that it is more likely for users that shared interests in the past to share an interest in the future rather than with a randomly chosen user. Hence, if users both liked the same movies in the past, their current likes would we better recommendations to each other than random movie suggestions. In general, we look for overlap or co-occurrence to make a recommendation by the construction of a user-item association matrix. Since there are many users, many movies and the fact that most users will only have rated a limited set of movies the resulting user-item association matrix is often very sparse. That is, the majority of the user-item interactions is not known at this moment.

A Matrix Factorization approach focuses on finding latent or hidden features. Mathematically speaking we reduce the dimensionality of our original “all users by all items” matrix into something much smaller.

Matrix factorization algorithms work by decomposing the user-item interaction matrix into the product of two lower dimensionality matrices.

Various matrix factorization techniques exist, we’ll use the Alternating Least Squares (ALS) algorithm. ALS calculates the optimal user and item matrix, which multiplied give the full association matrix (with the blanks filled in).

Alternating Least Squares in Apache Spark (link)

The ALS algorithm is provided in the Apache Spark MLlib package and is ridiculously easy to use!

als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="log",
coldStartStrategy="drop")
model = als.fit(training)

We won’t spend time here to tune hyperparameters or finding the (ALS-) rank that provides the best results on the validation set. (The resulting recommender is most likely not the best one we could make, but that’s not important in this story)

From this model, we can now obtain the latent factors for both users and movies. these are normal floating point vectors, representing the data in a reduced dimension.

moviefactors = spark.createDataFrame(model.itemFactors.rdd)
userfactors = spark.createDataFrame(model.userFactors.rdd)

Recommendations for a specific user are calculated by calculating the dot product between the latent factors of the users and the latent factors of the movies. The highest score yields the top recommendation for the selected user.

We want to use the results of the training somewhere outside the notebook. We could pre-calculate all results for all users or we could save the model and build a service that makes ad-hoc recommendations if we need them. We’ll go for a slightly different option here! We’re going to save the user and item factors in a relational database and use SQL to make user prediction!

MemSQL

MemSQL is a distributed, relational, SQL database. It describes itself as a no-limits database. The reason why we’re going to deploy the results of our recommender model into this database is that it comes with build-in ML functions that will allow us to run predictions from the database, using SQL, without needing a separate service that will calculate user recommendations.

DOT_PRODUCT(vector_expression, vector_expression)

The MemSQL documentation mentions that the vector_expression must be encoded as a blob containing packed single-precision floating-point numbers in little-endian byte order. A vector can be of any length, but both input vectors must be the same length and the blob lengths must be divisible by 4 bytes.

So let’s do that. Let’s create a user defined fucntion that can be applied to a Spark dataframe that will convert the floating point latent factor vectors into blob containing packed single-precision floating-point numbers.

import binasciidef vector2hex(vector):
vectorStr = b"".join([struct.pack('f', elem) for elem in vector])
return str(binascii.hexlify(vectorStr))[2:-1] # not to elegant but this works

udf_vector2hex = udf(vector2hex, StringType())
moviefactors_with_hex = moviefactors.withColumn("factors", udf_vector2hex("features"))
userfactors_with_hex = userfactors.withColumn("factors", udf_vector2hex("features"))

We’ll convert these results to pandas dataframe to easily merge them with the items and users’ meta data.

movie_frame = moviefactors_with_hex('id','factors').toPandas().rename(columns={"id": "movie_id", "factors": "factors"})
users_frame = userfactors_with_hex.select('id','factors').toPandas().rename(columns={"id": "user_id", "factors": "factors"})
db_users = users.merge(users_frame, left_on='UserID', right_on='user_id').drop(columns=['user_id'])
db_movies = movies.merge(movie_frame, left_on='MovieID', right_on='movie_id').drop(columns=['movie_id'])

We’re now ready to load this data into the database. First we need to start a MemSQL database.

MemSQL provides a great blog post on how to get started with MemSQL and docker, find the post here.

docker pull memsql/quickstart
docker run --rm --net=host memsql/quickstart check-system
docker run --rm -it --link=memsql:memsql memsql/quickstart simple-benchmark
docker run -d -p 3306:3306 -p 9000:9000 --name=memsql memsql/quickstart
Containers logs die DockStation

This will perform a system check and spin up a single-node MemSQL cluster on your local machine. We’ll create a database called data.

CREATE DATABASE `data`

and in data create two tables, users and movies

# Dump of table movies
# ------------------------------------------------------------
DROP TABLE IF EXISTS `movies`;CREATE TABLE `movies` (
`id` bigint(11) NOT NULL AUTO_INCREMENT,
`title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
`year` int(7) NOT NULL AUTO_INCREMENT,
`genres` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
`factors` binary(80) DEFAULT NULL,
PRIMARY KEY (`id`)
);
# Dump of table users
# ------------------------------------------------------------
DROP TABLE IF EXISTS `users`;CREATE TABLE `users` (
`id` bigint(11) NOT NULL AUTO_INCREMENT,
`gender` varchar(4) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
`age` int(7) DEFAULT NULL,
`occupation` int(7) DEFAULT NULL,
`zip_code` varchar(8) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL,
`factors` binary(80) DEFAULT NULL,
PRIMARY KEY (`id`)
);

Let’s give the columns in the pandas dataframe a better name and load them into the database. We’ll use sqlalchemy and pandas to_sql to do this job for us.

db_users.rename(
columns={'UserID':'id','Gender':'gender','Age':'age','Occupation':'occupation','Zip-code':'zip_code','factors':'factors'}, inplace=True)
db_movies.rename(
columns={'MovieID':'id','Title':'title','year':'year','Genres':'genres','factors':'factors'}, inplace=True)
Base = declarative_base()engine = create_engine('mysql://root@127.0.0.1')
engine.execute('USE data')
Base.metadata.create_all(engine)
db_movies.to_sql('movies', con=engine, if_exists='append', index=False)
db_users.to_sql('users', con=engine, if_exists='append', index=False)

Let’s make a movie recommendation for the user with id=1

SELECT movies.title,
movies.genres,
DOT_PRODUCT(UNHEX(users.factors), UNHEX(movies.factors)) AS score
FROM users
JOIN movies
WHERE users.id = 1
ORDER BY score DESC
LIMIT 10;
Making predictions via SQL in Sequel Pro

Or, since some other meta-data of the movies are available in the database, we can also make a recommendation for sci-fi movies from the 80s for this specific user without having to filter the results of the model afterwards.

SELECT movies.title,
movies.genres,
DOT_PRODUCT(UNHEX(users.factors), UNHEX(movies.factors)) AS score
FROM users
JOIN movies
WHERE users.id = 1 AND SUBSTRING(movies.year,3,1) = 8 AND movies.genres LIKE '%sci-fi%'
ORDER BY score DESC
LIMIT 10;
Making predictions via SQL in Sequel Pro

If a new user would join (without any ratings given so far) the ratings of similar users can be combined to suggest a movie; e.g. recommend some movies to a new M user, 18 years old.

SELECT movies.title,
movies.genres,
AVG(DOT_PRODUCT(UNHEX(users.factors), UNHEX(movies.factors))) AS avg_score,
count(1) as c
FROM users
JOIN movies
WHERE users.gender = 'M' AND users.age IN (18) AND DOT_PRODUCT(UNHEX(users.factors), UNHEX(movies.factors)) > 5
GROUP BY 1,2
ORDER BY avg_score DESC
LIMIT 10;
Making predictions via SQL in Sequel Pro

--

--

Pieterjan Criel @pjcr
Product & Engineering at Showpad

👨‍👩‍👧‍👦 Dad of two 🇧🇪 Ghent 🎈 @Balloon_inc / @Aicon_inc 👨‍💻 Coding 🧪 Data science 📈 Graph enthusiast 👨‍💻 Principal Engineer @showpad