Google Cloud : Recommendation Systems with Spark on DataProc

Samadhan Kadam
Petabytz
Published in
6 min readJun 24, 2019

In this tutorial, You’ll be learning how to create a movie recommendation system with Spark, utilizing PySpark.

The tutorial will focus more on deployment rather than code. We’ll be deploying our project on Google’s Cloud Infrastructure using:

· Google Cloud DataProc

· Google Cloud Storage

· Google Cloud SQL

Table of Contents:

1. Getting the Data

2. Storing the Data

3. Training the Model

4. Deploying to Cloud DataProc

Getting the data:

You’ll first need to download the dataset we’ll be working with. You can get to the little form of the movielens’ dataset here (1MB). Subsequent to confirming your work, you can test it with the full dataset here (224MB).

Inside this dataset, you’ll be using the ratings.csv and movie.csv documents. Each document gives headers to the sections as the primary line passage. You’ll have to evacuate this before stacking the information into CloudSQL.

Storing the data:

Google Cloud SQL:

You’ll need to create a few SQL scripts to create the db and tables.

CREATE DATABASE TEST;

CREATE TABLE MOVIES(

id varchar(255),

title varchar(255),

genre varchar(255),

PRIMARY KEY (id)

);

CREATE TABLE RATINGS (

userId varchar(255),

movieId varchar(255),

rating float,

PRIMARY KEY (userId, movieId),

FOREIGN KEY (movieId) REFERENCES MOVIES(id)

);

CREATE TABLE RECOMMENDATIONS(

userId varchar(255),

movieId varchar(255),

prediction float,

PRIMARY KEY (userId, movieId),

FOREIGN KEY (userId,movieId) REFERENCES RATINGS(userId,movieId)

);

Load SQL scripts and data into Cloud Storage:

Create a bucket and load the scripts into Google Cloud Storage. Buckets in cloud storage have unique name identifications. You’ll need to replace below with a name of your choosing. Using Google-Cloud-Sdk from terminal:

$ gsutil mb gs://movie-rec-tutorial
$ gsutil cp *.sql gs://movie-rec-tutorial
$ gsutil cp movies.csv ratings.csv gs://movie-rec-tutorial

(If you haven’t setup Google-Cloud-Sdk)

brew install cask google-cloud-sdkgcloud init

After this step, you can look into GCloud Storage and confirm the files were successfully uploaded.

Configure CloudSQL:

Next, you’ll create your sql database. Select second-generation. (I’ve disabled backups as this is a project and not necessary, you may choose otherwise):

After initializing the db, set a new root password (instance -> access control -> users) :

Take the scripts previously loaded into Cloud Storage and import them on the SQL instance. This will create the db & tables. (Import -> nameOfYourStorage -> Scripts). Due to foreign key constraints, execute the scripts in this order: (create_db, movies, ratings, recommendations)

When loading the table scripts, you’ll need to specify the database in the advanced options. The create_db file creates a DB with name “TEST”, referred to here.

Load the data:

Take the csv files (movies.csv, ratings.csv) and load them into the Movies and Ratings tables, respectively.

Open the Network:

To connect to the instance from the cluster, you’ll need to allow the ip addresses of the nodes access. For simplicity, open the allowed traffic to the world.

Open the Network:

To connect to the instance from the cluster, you’ll need to allow the ip addresses of the nodes access. For simplicity, open the allowed traffic to the world.

Confirmation:

If you would like to confirm everything up to this point has been executed correctly, take your favorite Mysql client and connect to your instance. Sequel Pro is my preferred.

You’ll need to take the IPv4 address of your SQL instance found here:

Training the model

You’ve loaded the data into Cloud SQL; time to begin training the model. With the code below, you’ll connect to your CloudSQL instance, read in the tables, train the model, generate and write predictions to the recommendations table.

import logging

import sys

from pyspark import SparkConf, SparkContext, SQLContext

from pyspark.mllib.recommendation import ALS

from pyspark.sql.types import StructType, StructField, StringType, FloatType

logging.basicConfig(level=logging.INFO)

logger = logging.getLogger(__name__)

CLOUDSQL_INSTANCE_IP = sys.argv[1]

CLOUDSQL_DB_NAME = sys.argv[2]

CLOUDSQL_USER = sys.argv[3]

CLOUDSQL_PWD = sys.argv[4]

conf = SparkConf().setAppName(‘Movie Recommender’) \

.set(‘spark.driver.memory’, ‘6G’) \

.set(‘spark.executor.memory’, ‘4G’) \

.set(‘spark.python.worker.memory’, ‘4G’)

sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

TABLE_RATINGS = ‘RATINGS’

TABLE_MOVIES = ‘MOVIES’

TABLE_RECOMMENDATIONS = ‘RECOMMENDATIONS’

jdbcDriver = ‘com.mysql.jdbc.Driver’

jdbcUrl = ‘jdbc:mysql://%s:3306/%s?user=%s&password=%s’ % (

CLOUDSQL_INSTANCE_IP, CLOUDSQL_DB_NAME, CLOUDSQL_USER, CLOUDSQL_PWD)

logger.info(“Loading Datasets from MySQL…”)

dfRates = sqlContext.read.format(‘jdbc’) \

.option(‘useSSL’, False) \

.option(“url”, jdbcUrl) \

.option(“dbtable”, TABLE_RATINGS) \

.option(“driver”, ‘com.mysql.jdbc.Driver’) \

.load()

dfMovies = sqlContext.read.format(‘jdbc’) \

.option(‘useSSL’, False) \

.option(“url”, jdbcUrl) \

.option(“dbtable”, TABLE_MOVIES) \

.option(“driver”, ‘com.mysql.jdbc.Driver’) \

.load()

dfRates.registerTempTable(‘Rates’)

sqlContext.cacheTable(‘Rates’)

logger.info(“Datasets Loaded…”)

rank = 8

seed = 5L

iterations = 10

regularization_parameter = 0.1

logger.info(“Training the ALS model…”)

model = ALS.train(dfRates.rdd.map(lambda r: (int(r[0]), int(r[1]), r[2])).cache(), rank=rank, seed=seed,

iterations=iterations, lambda_=regularization_parameter)

logger.info(“ALS model built!”)

# Calculate all predictions

predictions = model.recommendProductsForUsers(10) \

.flatMap(lambda pair: pair[1]) \

.map(lambda rating: (rating.user, rating.product, rating.rating))

schema = StructType([StructField(“userId”, StringType(), True), StructField(“movieId”, StringType(), True),

StructField(“prediction”, FloatType(), True)])

dfToSave = sqlContext.createDataFrame(predictions, schema)

dfToSave.write.jdbc(url=jdbcUrl, table=TABLE_RECOMMENDATIONS, mode=’overwrite’)

dfToSave.write.jdbc(url=jdbcUrl, table=TABLE_RECOMMENDATIONS, mode=’overwrite’)

Walking through the code:

The first few lines configure spark setting the app name as well as memory constraints for our driver/executor. Information regarding additional properties can be found at Spark Conf Docs.

After configuring spark, the code creates a url for our database connectivity with the information supplied from sys.args. This url is used to read the tables from the CloudSQL instance.

Next, the hyper parameters supplied to the ALS algorithm. I have supplied default parameters that perform well for this dataset. More information can be found at Spark ALS Docs.

Finally, the model is trained with the ratings dataframe. After training, top 10 predictions are generated and written back to the CloudSQL instance.

Copy this file to your Google Cloud Storage bucket:

$ gsutil cp engine.py gs://movie-rec-tutorial

Deploying to Google Cloud DataProc:

DataProc is a managed Hadoop and Spark service that is used to execute the engine.py file over a cluster of compute engine nodes.

You’ll need to enable the API in the API Manager:

After configuring the cluster and waiting for each node to be initialized, create a job to execute engine.py. You’ll need to provide your CloudSQL instance IP, username, and password as sys args (seen below).

That’s it! If you’d like to check out the written recommendations, connect your preferred mysql-client.

Conclusion

In this tutorial, you created a db & tables within CloudSQL, trained a model with Spark on Google Cloud’s DataProc service, and wrote predictions back into a CloudSQL db.

--

--