Yelp Reviews Sentiment Prediction via PySpark, MongoDB, AWS EMR

Nicha Ruchirawat
Quick Code
Published in
7 min readFeb 1, 2018

By: Nicha Ruchirawat, Tina Peng, Maise Ly

Sentiment analysis, or opinion mining, is a common Natural Language Processing problem applied to determine whether a text is positive or negative. In a world where customers increasingly voice their opinions online, it is crucial for businesses understand their online reputation.

We will explore a simple approach using Apache Spark’s Machine Learning library on Yelp Dataset to predict sentiment given a review text. We will also analyze which terms are most contributive to a positive or a negative restaurant review.

We will be predicting whether a review is positive or negative using Linear Support Vector Machine and Logistics Regression due to the interpretable nature of their feature coefficients. They are both effective classifier algorithms for NLP applications.

  1. Set up MongoDB and Zeppelin Notebook on AWS EMR instance

We set up AWS EMR instance with 1 master and 3 workers (each with m3.xlarge with 15 GB memory and 8 cores). This worked well with 1.5 M subset of the reviews, but more reviews can be processed via nodes with larger memory size or clusters with more nodes. The original dataset was stored in an Amazon S3 bucket. MongoDB was set up on the master node, with Yelp’s tables imported from S3, from which we loaded into Zeppelin Notebook.

To initiate the connection between PySpark and MongoDB:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("yelp).getOrCreate()
#connect to 'database' called yelp, 'collection' called review
review = spark.read.format("com.mongodb.spark.sql.DefaultSource")\
.option("uri", "mongodb://127.0.0.1:27017/yelp.review").load()

2. Pre-processing the reviews data

First, we need to clean the reviews text to remove any punctuation or numbers using the following function.

import string
import re
def remove_punct(text):
regex = re.compile('[' + re.escape(string.punctuation) + '0-9\\r\\t\\n]')
nopunct = regex.sub(" ", text)
return nopunct

We also need to convert star ratings to binary class, with 1 representing a positive review and 0 representing a negative review. A review is positive if it has 4+ stars rating or negative if it has less than 4 stars rating. The 4+ stars threshold is due to the fact that people tend to overrate restaurants (e.g. people do not typically view a 3 star restaurant as a good one).

def convert_rating(rating):
if rating >=4:
return 1
else:
return 0

We will convert these functions to PySpark’s user-defined functions and apply them on the text and rating columns upon querying from the whole ‘review’ collection.

from pyspark.sql.functions import udfpunct_remover = udf(lambda x: remove_punct(x))
rating_convert = udf(lambda x: convert_rating(x))
#select 1.5 mn rows of reviews text and corresponding star rating with punc removed and ratings converted
resultDF = df.select('review_id', punct_remover('text'), rating_convert('stars')).limit(1500000)
#user defined functions change column names so we rename the columns back to its original names
resultDF = resultDF.withColumnRenamed('<lambda>(text)', 'text')
resultDF = resultDF.withColumnRenamed('<lambda>(stars)', 'stars')

We also need to tokenize the text (break up into words) and remove stop words (common English terms such as ‘and’, ‘then, ‘therefore’). These will all be done using the existing SparkML and Pipeline library.

from pyspark.ml.feature import *#tokenizer and stop word remover
tok = Tokenizer(inputCol="text", outputCol="words")
#stop word remover
stopwordrm = StopWordsRemover(inputCol='words', outputCol='words_nsw')
# Build the pipeline
pipeline = Pipeline(stages=[tok, stopwordrm])
# Fit the pipeline
review_tokenized = pipeline.fit(resultDF).transform(resultDF).cache()

We cache the cleaned data so that the data frame does not need to be recomputed each time it is called.

3) Feature selection: choosing and integrating the most important n-grams (optional)

To enhance the accuracy of the machine learning model, we tried integrating bigrams and trigrams. This is done with tokenized text column before stop words are removed.

To select the most important ngrams, we input a TF-IDF matrix for bigram or trigram individually into Support Vector Machine model and picked out ngrams that has the biggest coefficient weights (both positive and negative). This works well as SVM has been shown to be a good method for feature selection.

We concatenate these ngrams back into the reviews so that they are taken into the model as one word and not 2 or 3 separate words (e.g. ‘great customer service’ becomes ‘great_customer_service’).

The best model consisted of unigrams and trigrams. Below is an example of how we chose trigrams:

# add ngram column
n = 3
ngram = NGram(inputCol = 'words', outputCol = 'ngram', n = n)
add_ngram = ngram.transform(review_tokenized)
# count vectorizer and tfidf
cv_ngram = CountVectorizer(inputCol='ngram', outputCol='tf_ngram')
cvModel_ngram = cv_ngram.fit(add_ngram)
cv_df_ngram = cvModel_ngram.transform(add_ngram)
# create TF-IDF matrix
idf_ngram = IDF().setInputCol('tf_ngram').setOutputCol('tfidf_ngram')
tfidfModel_ngram = idf_ngram.fit(cv_df_ngram)
tfidf_df_ngram = tfidfModel_ngram.transform(cv_df_ngram)
# split into training & testing set
splits_ngram = tfidf_df_ngram.select(['tfidf_ngram', 'label']).randomSplit([0.8,0.2],seed=100)
train_ngram = splits_ngram[0].cache()
test_ngram = splits_ngram[1].cache()
# Convert feature matrix to LabeledPoint vectors
train_lb_ngram = train_ngram.rdd.map(lambda row: LabeledPoint(row[1], MLLibVectors.fromML(row[0])))
test_lb_ngram = train_ngram.rdd.map(lambda row: LabeledPoint(row[1], MLLibVectors.fromML(row[0])))
# fit SVM model of only trigrams
numIterations = 50
regParam = 0.3
svm = SVMWithSGD.train(train_lb_ngram, numIterations, regParam=regParam)
#extract top 20 trigrams based on weights
top_ngram = svm_coeffs_df_ngram.sort_values('weight')['ngram'].values[:20]
bottom_ngram = svm_coeffs_df_ngram.sort_values('weight', ascending=False)['ngram'].values[:20]
ngram_list = list(top_ngram) + list(bottom_ngram)

To concatenate these top trigrams in the original text:

# replace the word with selected ngram
def ngram_concat(text):
text1 = text.lower()
for ngram in ngram_list:
if ngram in text1:
new_ngram = ngram.replace(' ', '_')
text1 = text1.replace(ngram, new_ngram)
return text1
ngram_df = udf(lambda x: ngram_concat(x))
ngram_df = review_tokenized.select(ngram_df('text'), 'label')\
.withColumnRenamed('<lambda>(text)', 'text')

Once this is implemented, Pipeline from step 2) needs to be repeated to update the tokenized columns with concatenated ngrams (instead applying Pipeline to resultDF, apply it on ngram_df).

4) Building final ‘bag of words’ TF-IDF feature matrix

To build the final TF-IDF feature matrix:

# count vectorizer and tfidf
cv = CountVectorizer(inputCol='words_nsw', outputCol='tf')
cvModel = cv.fit(review_tokenized)
count_vectorized = cvModel.transform(review_tokenized)
tfidfModel = idf.fit(count_vectorized)
tfidf_df = tfidfModel.transform(count_vectorized)

5) Support Vector Machine

To test performance of model, we split data into training and test set:

# split into training and testing set
splits = tfidf_df.select(['tfidf', 'label']).randomSplit([0.8,0.2],seed=100)
train = splits[0].cache()
test = splits[1].cache()

We fit the TF-IDF feature matrix to SVM model on the train set. One hyper-parameter, regParam, is tuned to control over-fitting. The value of 0.3 yielded best f1 scores on both train and test sets.

numIterations = 50
regParam = 0.3
svm = SVMWithSGD.train(train_lb, numIterations, regParam=regParam)
test_lb = test.rdd.map(lambda row: LabeledPoint(row[1], MLLibVectors.fromML(row[0])))
scoreAndLabels_test = test_lb.map(lambda x: (float(svm.predict(x.features)), x.label))
score_label_test = spark.createDataFrame(scoreAndLabels_test, ["prediction", "label"])

6)Regularized (Elastic Net) Logistics Regression

Using the same training data, we also applied a regularized logistic regression. SVM focuses on finding the separating plane that maximizes the distance of the closest points to the margin, whereas Logistic Regression maximizing the probability of the data (i.e. the further it lies away from the hyperplane the better). In this case, we also tune for the best hyper-parameter, RegParam and ElasticNetParam for best results. We fit the model as follows:

# Elastic Net Logit
lambda_par = 0.02
alpha_par = 0.3
lr = LogisticRegression().\
setLabelCol('label').\
setFeaturesCol('tfidf').\
setRegParam(lambda_par).\
setMaxIter(100).\
setElasticNetParam(alpha_par)
lrModel = lr.fit(train)
lr_pred = lrModel.transform(test)

7) Evaluation

Note that f1 score is chosen as basis for evaluation due to imbalanced dataset (more positive reviews than negative reviews). Accuracy will not be able to tell the entire story, while f1 — as a weighted average of precision and recall — could reveal how well the model performs in identifying both the predictions relevancy and % of truly relevant results are correctly predicted.

To evaluate SVM model:

f1_eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
svm_f1 = f1_eval.evaluate(score_label_test)
print("F1 score: %.4f" % svm_f1)

To evaluate Logistics Regression model:

f1_eval = MulticlassClassificationEvaluator(labelCol="label", predictionCol="prediction", metricName="f1")
lr_f1 = f1_eval.evaluate(lr_pred)
print("F1 score: %.4f" % lr_f1)

The best f1 score was from SVM model with unigrams and trigrams at 87.32% on the test set. The Logistics Regression model yielded f1 score of 84.21% on the test set.

8) Feature Analysis

We can use the coefficient weights from our final SVM model to see which terms contribute the most to a positive review or a negative review. In this case, the terms associated with the most positive coefficients represent those that contribute the most to a positive review. The terms associated with the most negative coefficients represent those that contribute most to a negative review.

Below is a word cloud representing the most ‘positive’ terms in blue and the most ‘negative’ terms in red.

We can see here that the terms that are most positive include ‘friendly staff’, ‘delicious’, ‘great customer service’, ‘great food’. This indicates that the important features for customer’s satisfaction is staff, food taste, and service. Further improvements can be made by removing phrases such as ‘great’, ‘recommend’, etc. so that more meaningful tokens can surface.

The terms that are most negative include ‘minutes’, ‘over priced’, ‘bland’, ‘food was ok’, ‘nothing special’. This suggests that negative reviews are driven by long wait times, overpriced food, bad food taste, an experience that isn’t deemed as anything special.

This post summarized the steps that yielded the best results. To see all our code for all the steps we tried, check out this GitHub.

Please click 👏 button below a few times to show your support! ⬇⬇ Thanks! Don’t forget to follow Quick Code below.

Find out Free courses on Quick Code for various programming languages. Get new updates on Messenger.

--

--