Hubba’s Product Recommendations Data Pipeline

Building a recommendation engine with AWS Data Pipeline, Elastic MapReduce and Spark

From Google’s advertisements to Amazon’s product suggestions, recommendation engines are everywhere. As users of smart internet services, we’ve become so accustomed to seeing things we like. This blog post is an overview of how we built a product recommendation engine for Hubba. I’ll start with an explanation of different types of recommenders and how we went about the selection process. Then I’ll cover our AWS solution before diving into some implementation details.

What’s a recommender?

There are three main types of recommendation engines used today:

  1. Content-based filtering
  2. Collaborative filtering
  3. A hybrid of content and collaborative (hybrid recommender)

Content-based recommenders use discrete properties of an item, such as its tags. If a user views products tagged “dogs”, “pets”, “chow”, the recommender may suggest to view more pet food products. Collaborative filtering recommenders use a user’s past actions to guess what items the user of interest may have a preference for. Take for example two users. The first views items 1 and 3, while the second views items 2 and 3. The first user would get recommended to view item 2, and the second user would get recommended to view item 1. The hybrid recommender combines the above two methods using much more sophisticated methods.

We decided to go with collaborative filtering (CF) due to the quality and quantity of our data. The ability to tag products with keywords was only recently implemented, so we didn’t have enough content to learn from. But we had been tracking user actions for at least a year and knew it to be reliable. Also, we didn’t have to worry about users who had artificially spammed their products with tags.

CF does have its weaknesses, though. CF recommenders have a problem known as the “cold-start problem”, where they can’t find new items to recommend unless the items have already been seen by a user. Though we were confident that this problem would get ameliorated over time as more users interacted with more products.

An implementation of a CF recommender is the alternating least squares (ALS) algorithm. ALS takes a large matrix of user activities and products, then factors the matrix into two smaller matrices called latent factors. ALS spits out a latent factor for users, and another for products. These factors describe the initial large matrix but with less data. I won’t go into the math behind ALS, but if you’re interested, this paper here by Hu et al. describes it in detail. If you’re familiar with singular value decomposition, ALS is another approach which accomplishes the same goal. It’s less accurate but faster, especially when powered by Spark.

M here is the matrix of users and products, U is the matrix of users, and V is the matrix of products. With ALS, the product of U and Vᵀ would give an approximation of M. Illustration source: http://www.cs.cmu.edu/~yuxiangw/research.html

Our recommender solution

We had a couple of requirements while planning the architecture for our recommendation engine.

  1. We needed to be able to keep a snapshot of all data that went into generating our models.
  2. Our recommendation engine’s compute solution needed to be robust enough to scale with growing data. Whether that’s in compute power, memory capacity, or both.
  3. The compute solution needed to be cost-effective.
  4. The compute solution needed a good monitoring tool for maintaining efficient distribution of load.
  5. The models generated by each run of the pipeline needed to be accessible afterwards.
  6. We needed to be able to store user recommendations somewhere for quick retrieval through an API.

I’ll address the above six points in order.

AWS is good at transferring data between their services, and data from Redshift to S3 is no exception. AWS Data Pipeline allowed us to regularly run a SELECT query and save the output as a CSV in S3 with a file name specific to a particular run of the pipeline. We could then import the data into the downstream model generation activities. Data input snapshots — check.

AWS Elastic MapReduce (EMR) provided us with a large catalogue of managed instances for compute. We could choose from memory or storage optimized instances and scale up or down as our needs changed. Then through Data Pipeline, we could fire up clusters when a pipeline started and install monitoring software and Spark on the fly. We could run bootstrapping tasks after launch, and shut down when the jobs finished. We could also bid on spot instances which would decrease operating costs even further. Scalable, cost-effective, and monitored compute were what we needed and what we got with EMR.

EMR can use S3 as a native data store, so it’s easy to transfer files back and forth from S3 to EMR and vice versa. As soon as we generated a model in EMR, it never touched the local file system in EMR. It saved directly into S3 for retrieval later. Generated model snapshots — check.

For storage and access to the generated predictions for a particular user, we decided to go with DynamoDB. DynamoDB is AWS’ managed NoSQL database. It’s super easy to use since there’s only two knobs that we needed to control — the read and write capacity units. The two knobs controlled the volume of data that the database could read and write on a per-second level. DynamoDB also worked well with AWS API Gateway, a managed API to access DynamoDB data that had none to little technical upkeep. We could also lock in this API to only be accessible within our production EC2 instances, by having both our servers and APIs on AWS.

The solution relied on AWS a lot, but it came with a lot of integration shortcuts that saved us a lot of time. I’ll now walk you through a big-picture overview of how we got to a production-ready recommendation engine.

Start with data

Most of my data preprocessing and model generation is on an IPython Notebook. This allows me to document my steps as I go, and keep track of why I made certain decisions. It’s also much easier for someone else to read through the process in as close to sentence form as possible than have to read through all of the pipeline code.

I preface every notebook with a question that the notebook seeks to answer. Then followed by a description of my approach and a rationale for why I chose to approach the question that way. You don’t always have to do this, but it helps me focus on the problem at hand. It also re-contextualizes me when I haven’t opened up the notebook in a while.

The motivating question should be concise.

I use the PySpark API to talk to Spark, and Pandas to preprocess tabular data. If you’re curious, I’ve written how you can set up your own IPython/PySpark environment in the appendix at the end of this post.

At Hubba, we use a combination of different user actions to determine which products to recommend to you. For this overview though, I’ll only use product views as a measure of preference for a certain product. The intuition is that users will prefer to see products like those they have seen before. So let’s first import that data.

I’ve extracted a Redshift query out as a CSV to S3, and I’m downloading the file from S3 to my machine for use in IPython.

The main action of interest is ‘Opened Product Page’ . This action gets fired every time a user sees a product, telling us who saw the product and what product they were looking at. I’ve changed all ‘Opened Product Page’ actions to the integer value 1, since you can’t factorize a matrix with string data. I then use scikit-learn’s preprocessing module to create an encoder for users and products. Think of an encoder like a key-value store. Each key is an integer from zero to the number of unique users (minus 1) or products (minus 1), corresponding to the actual user or product ID. You’ll have two different encoders — one for users, and another for products.

Encoders translate raw labels into numbers for the model generation step. We’ll use encoders to translate our Product Views DataFrame’s user IDs and product IDs. It’s as easy as transform()!

Notice how the user and product IDs have all become numerics.

At this point I’ve preprocessed my main data input and I’m ready to generate my model. In each row, I’ve changed all my product views to a 1, and normalized all user and product IDs. The next step is to create an entry point for your Spark code to talk to a Spark cluster. We haven’t written any PySpark yet, but we need to create Contexts before that. I’ll now initialize my SparkContext and SQLContext, naming the app “BuildProductRecommendations”.

(Spark has a guide on how they’ve implemented CF and a practical tutorial here that you should check out)

I want to see how different hyperparameters affect the quality of my model, so I’ll split up the data into training, validation, and test sets. The training set is the data used to train the model. The validation set is the subset of data used for experimenting with different hyperparameters. The test set checks how well your optimized model performs against fresh data. The validation and test sets don’t have the action labels, since I’m blinding the model to the actual results so that we can test how well it performs with only a user ID and product ID pair.

RDD.randomSplit() randomly splits the original normalized product views DataFrame into a 6:2:2 ratio.

If you’re curious, the PySpark ALS algorithm’s documentation is here. I used the trainImplicit() method to train the model, as I was working with implicit preferences as opposed to explicit ratings. Implicit feedback is a user’s indirect opinion given through their site activity. A click or a view would be an implicit form of feedback. Explicit feedback is a user’s direct reaction to an item that they see. For example, a 3-star rating from a user for a product is an explicit indicator of how much a user liked or valued a product. Models trained with explicit feedback attempt to predict the user’s specific rating, and those trained with implicit feedback predict the confidence of a user viewing a product. A subtle but important difference.

There are a couple of hyperparameters to tune, such as the number of iterations, regularization parameter (lambda), and confidence constant (alpha) — PySpark’s ALS documentation explains them well, so I won’t bore you with them here.

Next, I’ve determined that the best rank (the size of the latent factors) would be 16. I found this using a metric called the root mean squared error (RMSE). By experimenting with different lambdas and alphas, you can reach a minimum for the RMSE. This value helps with describing how incorrect a model’s predictions are from true values. Remember that we had hidden the true values in the validation and test sets. I reveal the true values to the model after its prediction to calculate the RMSE. I selected an alpha of 80 and a lambda of 0.1, but feel free to experiment with different values on your own.

Once you’ve determined the hyperparameters that help you reach the lowest RMSE value, retrain the model with the same seed value. Then do a final test on the test_for_predict_rdd before retraining the model with your entire dataset. This will be your final accuracy metric before implementing your model in Data Pipeline.

Push recommendations to DynamoDB

The next step is to push the recommendations for every user to DynamoDB.

Our DynamoDB table will have just two columns — a column for user IDs, and another for a user’s recommendations. The main implementation of this is quite simple. We’ve generated a model that can predict what a user’s preference for a particular product may be, and now we want to store a list of the top recommendations on DynamoDB. This will allow us to shut down our cluster once predictions are generated, then hook up DynamoDB to API Gateway for fast recommendations when we need them.

To generate product recommendations, PySpark’s mllib.recommendation module has a method called recommendProducts(), which does the heavy lifting for you. It takes two arguments, the normalized user ID, and the number of recommendations you’d like.

predictions = model.recommendProducts(1, 10)

The above code sample will generate 10 product recommendations for normalized user ID number 1. Use the two encoders you’ve generated before to translate the normalized labels in the recommendations into actual user IDs and product IDs. Then, using boto3, upload the data to your DynamoDB table.

Since user recommendations may change, you’ll update the item in DynamoDB instead of inserting a new record each time. If the specific user ID doesn’t exist, then DynamoDB will create a new record.

Configuring Data Pipeline

AWS Data Pipeline doesn’t have the most intuitive user interface, but it excels in interfacing with other AWS services. Logs go directly into a folder of choice in S3, and Data Pipeline comes with a couple of templates to get you started. Data Pipeline has a visual interface called Architect to help you build your pipeline graphically by adding and removing different components to and from a blank canvas.

This is the first page you’ll see when you click on “Create Pipeline”. Make sure you have “Build using Architect” checked and an S3 directory for logs chosen before clicking on the blue button, “Edit in Architect”.

The first thing that you’ll need to do is extract your most recent user activity data from its source, whether it’s in Redshift, S3, DynamoDB, MySQL, or SQL. DataNodes are objects that hold a certain collection of data, and can be extracted or inserted into a Database object. Activities to query and move data into separate DataNodes run on Resources, and these resources can be an EC2 instance or an EMR cluster. You can transfer data from a Redshift SELECT query into S3 using the pipeline diagram below. This is where we store snapshots of input data for recommendations.

A Redshift Database provides extracted user activity data in the form of a SqlDataNode, and this activity data is copied into S3 as a CSV by a CopyActivity which runs on an Ec2Resource node.

Now that you have the same input data in S3 as the data you experimented with while building your model, you can create an EmrActivity node to generate the model using this data. Remember that every Activity runs on a Resource Node, so your EmrActivity node will run on an EmrCluster Resource.

The model generation activity will depend on the CopyActivity to finish copying user activity data from Redshift to S3. As an extra precaution, we can have an S3KeyExists precondition which checks intermittently to see if the copied user activity data truly exists in S3 before running the model generation step.

EmrActivities will have a step that runs once a EmrCluster boots up. If you have a Python script that generates your model, place your script in S3 and you can reference the script directly from the step in Data Pipeline.

It’s required that you use command-runner.jar to run your own steps.

It’s required that you set the deploy-mode flag to cluster, so that you are able to access S3 files natively. Set the master flag to use YARN as its cluster manager, and provide spark with 9 GB of executor memory if you’re using a m3.xlarge EMR cluster with one master node and 2 core nodes. For more details on how to configure jobs in Spark with EMR, this article by an AWS consultant will help.

Configuring PySpark for Elastic MapReduce (EMR)

When you’re working with PySpark in EMR, it’s highly likely that you’ll require external Python packages to successfully execute some steps of your pipeline. Working with external Python packages means that every package needs to be installed on every machine that you’re adding to your cluster. Sure, it’s possible to do a pip install for every machine when bootstrapping your EMR, but it’s far easier to create a Python virtual environment with virtualenv that you can download onto every machine before running the step.

Since we want to make sure that any OS-dependency issues don’t get in the way of this step, launch the exact same EMR cluster with the same configurations that you’ll be using in Data Pipeline, then ssh into the master instance, and run:

virtualenv -p python3 venv # use python3
source venv/bin/activate # activate your virtualenv

You can now start normally installing your Python packages with pip. Once you’re done, zip the virtualenv, and send it to an S3 bucket for later use.

deactivate
zip -r venv.zip venv
aws s3 cp venv.zip s3://my-bucket-name/venv.zip

EMR allows you to bootstrap the cluster with certain commands after the cluster starts, so you can copy venv.zip into the EMR cluster that launches during your pipeline execution, and specify with EmrConfiguration nodes in Data Pipeline which Python executable to use. I’ve placed the bootstrap and configuration files in the Appendix.

You’ll place your configurations in the EmrConfiguration node.

Conclusion

Hopefully you’ve become a bit more familiar with how AWS Data Pipeline, EMR, and Spark can help you build your own recommendation engine. This guide alone is definitely not enough, though. Be sure to check out the links referenced above — I’ve placed them in a section in the appendices below for your convenience. The only step that not mentioned in this blog post is how to connect DynamoDB to API Gateway, but AWS has an article for that here.

Thank you for reading!


Hubba is a network for retailers and brands to connect and share product information with each other.

The author of this post, Won June Tai, is a Summer 2016 Hubba Product/Data intern. You can find him on Twitter @wonjunetai, and on Medium here.

Thank you to Johnathan Nightingale and Andrew Munro for help in editing drafts of this blog post :)

Appendix

Setting up IPython and PySpark

Hopefully you’re on a Mac, and that you’ll have Homebrew. If not, here’s a guide on how to configure IPython for Windows.

brew update
brew install apache-spark
brew install python

Depending on where you’ve configured Homebrew to install packages, add the below to your profile. I have my brew packages installed in /usr/local/Cellar — your configuration may be slightly different.

# in either ~/.bash_profile or ~/.profile
export SPARK_HOME=/usr/local/Cellar/apache-spark/1.6.1/libexec
source ~/.bash_profile (or ~/.profile)

Now that you have the latest version of Python, you should be able to use this to install IPython and the findspark tool to run PySpark locally. Findspark is a handy tool which adds PySpark to the sys.path at runtime.

pip install findspark ipython jupyter

Launch IPython with:

ipython notebook

But before you do any of that, you should start your Spark master node, and a couple of worker nodes with:

cd $SPARK_HOME/sbin
./start-master.sh # starts the master node
./start-slave.sh <master-spark-URL>

You can find your master-spark-URL on the master server’s web UI (http://localhost:8080).

EmrCluster bootstrapping

There’s a special field in the EmrCluster node for “Bootstrap Action”, and you can point it to a shell script file in S3 that runs on every single node of your cluster. This is what I have:

#!/bin/bash
# Bootstraps the EMR cluster with pre-installed pip
# packages for use by PySpark.
set -e
cd /home/hadoop
aws s3 cp s3://<my-bucket-name>/bootstrapping/venv.zip /home/hadoop/venv.zip
unzip /home/hadoop/venv.zip

EmrConfiguration file

In spark-defaults, it’s possible to specify a specific Python executable to run in an EMR job, and because I needed it to run the version of Python which had access to all the pre-installed pip packages — I specified the Python executable present in the bootstrapped virtualenv.

[
{
"Classification": "spark-env",
"Properties": {},
"Configurations": [
{
"Classification": "export",
"Properties": {
"PYSPARK_PYTHON": "/usr/bin/python34"
},
"Configurations": []
}
]
},
{
"Classification": "spark-defaults",
"Properties": {
"spark.yarn.appMasterEnv.PYSPARK_PYTHON": "/home/hadoop/venv/bin/python3.4"
}
}
]

Links