From scikit-learn to Spark ML
Taking a machine learning project from Python to Scala
In a previous post, I showed how to take a raw dataset of home sales and apply feature engineering techniques in Python with pandas. This allowed us to produce and improve predictions on home sale prices using scikit-learn machine learning models.
But what happens when you want to take this sort of project to production, and instead of 10,000 data points perhaps there are tens or hundreds of gigabytes of data to train on? In this context, it is worth moving away from Python and scikit-learn toward a framework that can handle Big Data.
Enter Scala and Spark
Scala is a programming language based on the Java Virtual Machine (JVM) that uses functional programming techniques. There is an endless number of very sophisticated — and complicated — features in Scala that you can study, but getting started in basic Scala is not necessarily much harder than writing code in Java or even Python.
Spark, on the other hand, is a framework based on Hadoop technologies which provides far more flexibility and usability than traditional Hadoop. It is probably the best tool for managing and analyzing large datasets, aka Big Data. The Spark ML framework allows developers to use Spark for data processing at scale while building machine learning models.
Why not just do all of your data exploration and model training in Spark ML in the first place? You certainly could, but the truth is, Python is much easier for open-ended exploration especially if you are working in a Jupyter notebook. But having said that, Scala and Spark does not need to be that much more complicated than Python, as both pandas and Spark use DataFrame structures for data storage and manipulation. Our goal here is to show this simplicity rather than dwell on the difficulties.
This post will not delve into the complexities of Scala, and it will assume some knowledge of the previous project. We are going to reproduce the results of the Python example, but we will not rehash all the reasons why we did them as they were explained previously.
First, we will start by loading the dataset, exactly the same CSV file as before.
val data_key = “housing_data_raw.csv”val df = spark.read
.format(“csv”)
.option(“header”, “true”)
.option(“inferSchema”, “true”)
.load(s”./$data_key”)
Next, we will remove the outliers that we identified in the previous post.
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._def drop_outliers(data: DataFrame) = {
val drop = List(1618, 3405,10652, 954, 11136, 5103, 916, 10967, 7383, 1465, 8967, 8300, 4997)
data.filter(not($"_c0".isin(drop:_*)))
}val housing = drop_outliers(df)
Now, we transform the lastsolddate
field from text to a numeric value, which a regression model can train on.
val housing_dateint = housing.withColumn("lastsolddateint", unix_timestamp($"lastsolddate","MM/dd/yy"))
We also want to drop a number of columns that, for now anyway, we do not want to train on.
def drop_geog(data: DataFrame, keep: List[String] = List()) = {
val removeList = List("info","address","z_address","longitude","latitude","neighborhood",
"lastsolddate","zipcode","zpid","usecode", "zestimate","zindexvalue")
.filter(!keep.contains(_))
data.drop(removeList: _*)
}val housing_dropgeo = drop_geog(housing_dateint)
We perform this operation via a function in case we want to run the same code again later, which we will. Also note that the function has a keep
parameter, in case we want to actually keep one of these values which we are removing. Which we will later.
Note that most of this syntax is pretty close to Python with two major exceptions. First, Scala is a typed language. Variables actually do not have to be explicitly typed, but function parameters do. Second, you will see a line like:
data.drop(removeList: _*)
This means to drop everything in removeList
from the data DataFrame. The _*
is a wildcard type definition, just some necessary syntax to tell the Scala compiler how to get the job done.
Using VectorAssembler in Spark ML
Now we want to split our data into a training and a test set.
import org.apache.spark.ml.feature.VectorAssemblerdef train_test_split(data: DataFrame) = {
val assembler = new VectorAssembler().
setInputCols(data.drop("lastsoldprice").columns).
setOutputCol("features")
val Array(train, test) = data.randomSplit(Array(0.8, 0.2), seed = 30) (assembler.transform(train), assembler.transform(test))
}val (train, test) = train_test_split(housing_dropgeo)
This is a bit tricky and requires some explanation.
In scitkit-learn, you can take an entire pandas DataFrame and send that to the machine learning algorithm for training. Spark ML also has a DataFrame structure but model training overall is a bit pickier. You have to pack all of your features, from every column you want to train on, into a single column, by extracting each row of values and packing them into a Vector. That means that Spark ML trains off of only one column of data, which happens to be a data structure that actually contains multiple columns of data. Setting this up is not complicated once you have the code (see above) but it is an extra step you have to be conscious of.
Creating one of these Vector columns is done using the VectorAssembler. We create the VectorAssembler, denoting that we want to use all of our feature columns (except our label/target column, lastsoldprice
) then give the new Vector column a name, usually features
. Then we use this new assembler to transform two DataFrames, the test and train datasets, and then return each of those transformed DataFrames as a tuple.
Now, we can do some machine learning. Let’s start with Linear Regression.
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.evaluation.RegressionEvaluatorval lr = new LinearRegression()
.setLabelCol("lastsoldprice")
.setFeaturesCol("features")val lrModel = lr.fit(train)
val predictions = lrModel.transform(test)val rmse = new RegressionEvaluator()
.setLabelCol("lastsoldprice")
.setPredictionCol("prediction")
.setMetricName("rmse")val r2 = new RegressionEvaluator()
.setLabelCol("lastsoldprice")
.setPredictionCol("prediction")
.setMetricName("r2")println("Root Mean Squared Error (RMSE) on test data = " + rmse.evaluate(predictions))
println("R^2 on test data = " + r2.evaluate(predictions))
Our first prediction
This should be pretty self-explanatory. Here is the output of the above code.
Root Mean Squared Error (RMSE) on test data = 857356.2890199891
R^2 on test data = 0.31933500943383086
That’s not great, but that’s not the point. The real win here is that we now have our data in a format where we can use Spark ML, so it is an easy step from here to trying out other algorithms. But first, we will recreate the above in a function that we can call using different algorithms. While we are at it, we will add an option for cross validation, so that we can test multiple different hyper-parameters and choose the best resulting model. Note that this step is not strictly necessary — the previous code can be easily refactored to use different algorithms by itself — but it is a better practice.
import org.apache.spark.ml.Predictor
import org.apache.spark.ml.PredictionModel
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.param.ParamMapdef train_eval[R <: Predictor[Vector, R, M],
M <: PredictionModel[Vector, M]](
predictor: Predictor[Vector, R, M],
paramMap: Array[ParamMap],
train: DataFrame,
test: DataFrame) = {val cv = new CrossValidator()
.setEstimator( predictor
.setLabelCol("lastsoldprice")
.setFeaturesCol("features"))
.setEvaluator(new RegressionEvaluator()
.setLabelCol("lastsoldprice")
.setPredictionCol("prediction")
.setMetricName("rmse"))
.setEstimatorParamMaps(paramMap)
.setNumFolds(5)
.setParallelism(2) val cvModel = cv.fit(train)
val predictions = cvModel.transform(test)
println("Root Mean Squared Error (RMSE) on test data = " + rmse.evaluate(predictions))
println("R^2 on test data = " + r2.evaluate(predictions)) val bestModel = cvModel.bestModel
println(bestModel.extractParamMap)
bestModel
}
Most of this should also be pretty self-explanatory — except the function definition, which is virtually impenetrable. That is this part:
def train_eval[R <: Predictor[Vector, R, M],
M <: PredictionModel[Vector, M]](
predictor: Predictor[Vector, R, M],
paramMap: Array[ParamMap],
train: DataFrame,
test: DataFrame) = {
Unfortunately, Spark ML does not seem to have a generic “Model” type where we could, for example, pass in an object that is a RegressionModel, and then our function could take that “Model” object and call the .fit
method. That would allow us to avoid this mess. Instead, we have to come up with this complex definition just so we can create a method that accepts a generic “Model” type because the type is also dependent on the input format and an implicit evaluator type. Coming up with this sort of definition is pretty tricky — after looking around and some trial and error, I found that taking it directly from the Spark ML source code worked just fine.
The good news is, once you write something like this, the callers do not need to really know about it or understand it (it is unlikely that they will, to be honest). Instead, this complex type definition allows callers to write really clean code like this:
val lr = new LinearRegression()val lrParamMap = new ParamGridBuilder()
.addGrid(lr.regParam, Array(10, 1, 0.1, 0.01, 0.001))
.addGrid(lr.elasticNetParam, Array(0.0, 0.5, 1.0))
.addGrid(lr.maxIter, Array(10000, 250000))
.build()train_eval(lr, lrParamMap, train, test)
Now we are back to something simple: an algorithm (Linear Regression) and a grid of parameters for cross validation. The function call then becomes as simple as we saw in Python. It is literally the same code. Note that I even kept the embedded underscore syntax for train_eval
and other variables. Typically in Scala and Java we would use camel case, ie trainEval
, and that is what I would do when writing production code, but since this tutorial was originally written in Python it was worth maintaining some consistency for comparison.
We will now look at a few other algorithms to see how they do. Note that Lasso and Ridge do not have explicit classes, as the elasticNetParam
designates those two algorithms when set to 1 and 0, respectively.
Decision Tree:
import org.apache.spark.ml.regression.DecisionTreeRegressorval decisionTree = new DecisionTreeRegressor()
val dtParamMap = new ParamGridBuilder().build()
train_eval(decisionTree, dtParamMap, train, test)
Results:
Root Mean Squared Error (RMSE) on test data = 759685.8395738212
R^2 on test data = 0.46558480196241925
Random Forest:
import org.apache.spark.ml.regression.RandomForestRegressorval randomForest = new RandomForestRegressor()val rfParamMap = new ParamGridBuilder()
.addGrid(randomForest.maxBins, Array(4, 16, 32, 64))
.addGrid(randomForest.numTrees, Array(1, 10, 100))
.addGrid(randomForest.maxDepth, Array(2, 5, 10))
.build()train_eval(randomForest, rfParamMap, train, test)
Results:
Root Mean Squared Error (RMSE) on test data = 647133.830611256
R^2 on test data = 0.6122079099308858
Gradient Boost:
import org.apache.spark.ml.regression.GBTRegressorval gradientBoost = new GBTRegressor()val gbParamMap = new ParamGridBuilder()
.addGrid(randomForest.maxBins, Array(16, 32))
.addGrid(randomForest.numTrees, Array(5, 10, 100))
.addGrid(randomForest.maxDepth, Array(5, 10))
.addGrid(randomForest.minInfoGain, Array(0.0, 0.1, 0.5))
.build()train_eval(gradientBoost, gbParamMap, train, test)
Results:
Root Mean Squared Error (RMSE) on test data = 703037.6456894034
R^2 on test data = 0.5423137139558296
While the exact results differ (especially for Linear Regression), we also see a similar trend of Random Forest and Gradient Boost working better than the simpler algorithms. Once again, we can improve our results by using better data. We do this by reincorporating the neighborhood
data, but in a numerical format that the algorithms can use. We do this by changing categories like Mission
and South Beach
to columns of one and zeros using one-hot encoding.
One-hot encoding
First, we rebuild or DataFrames with this data:
val housing_neighborhood = drop_geog(housing_dateint, List("neighborhood"))
Then we transform the data with one-hot encoding using a Pipeline
:
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.OneHotEncoderEstimator
import org.apache.spark.ml.feature.StringIndexerval indexer = new StringIndexer().setInputCol("neighborhood").setOutputCol("neighborhoodIndex")val encoder = new OneHotEncoderEstimator()
.setInputCols(Array(indexer.getOutputCol))
.setOutputCols(Array("neighborhoodVector"))val pipeline = new Pipeline().setStages(Array(indexer, encoder))val housingEncoded = pipeline.fit(housing_neighborhood).transform(housing_neighborhood)
.drop("neighborhoodIndex")
.drop("neighborhood")
First, we have to use a StringIndexer
then we can get the results we want using the OneHotEncoderEstimator
. Since these are two steps, we can put them together using a Pipeline
of stages. Pipelines can be used for many multi-stage cleaning and modification operations, especially where you need to do these same stages over and over again, ie with evolving data sets that require retraining. It is not really necessary here, but it is worth showing because it can be so useful.
Having updated our data, we can rerun our experiments exactly as before. We just pass in our new data (after creating training and testing sets):
val (train_neighborhood, test_neighborhood) = train_test_split(housingEncoded)
For example, for Linear Regression, we call exactly the same function with these new variables:
train_eval(lr, lrParamMap, train_neighborhood, test_neighborhood)
Results:
Root Mean Squared Error (RMSE) on test data = 754869.9632285038
R^2 on test data = 0.4723389619596349
This still is not great, but it is quite a bit better than previously. Now let’s look at the results for the other algorithms after passing in the transformed DataFrames.
Decision Tree:
Root Mean Squared Error (RMSE) on test data = 722171.2606321493
R^2 on test data = 0.5170622654844328
Random Forest:
Root Mean Squared Error (RMSE) on test data = 581188.983582857
R^2 on test data = 0.6872153115815951
Gradient Boost:
Root Mean Squared Error (RMSE) on test data = 636055.9695573623
R^2 on test data = 0.6253709908240936
In each case, we see quite an improvement, and now we have a model and a training pipeline that we can take into production with larger data sets.