Spark ML Linear regression models and sample performance tuning

Jayasagar
3 min readAug 14, 2018

--

In the last post, I tried to describe the data loading part and extracted mappings to convert the categorical features to binary-encoded features!

In this post, the focus is to cover basic implementations of Spark ML Regression models and their basic performance tuning approach.

A sample implementation can be found at https://github.com/Jayasagar/sparkml-regression-models-movie-revenue-predictions

Basic Loss functions

# Aboslute Error
def abs_error(actual, pred):
return np.abs(pred - actual)
# Mean Squared Error
def squared_error(actual, pred):
return (pred - actual)**2

DecisionTree model

model_dt = DecisionTree.trainRegressor(trainingData_dt, categoricalFeaturesInfo={},
impurity='variance', maxDepth=5, maxBins=3000)
# Evaluate model on test instances and compute test error
predictions_dt = model_dt.predict(testData_dt.map(lambda x: x.features))
labelsAndPredictions_dt = testData_dt.map(lambda lp: lp.label).zip(predictions_dt)

Decision Tree Error result

mse = labelsAndPredictions_dt.map(lambda lp: squared_error(lp[0], lp[1])).mean()
mae = labelsAndPredictions_dt.map(lambda lp: abs_error(lp[0], lp[1])).mean()

Performance tuning

For performance tuning of the model, I just created a basic evaluation function, where you can pass the attributes for tuning of the model behaviour!

def evaluate_dt(trainData, testData, maxDepthValue, maxBinsValue):
modelDT = DecisionTree.trainRegressor(trainData, categoricalFeaturesInfo={},
impurity='variance', maxDepth=maxDepthValue, maxBins=maxBinsValue)
# Evaluate model on test instances and compute test error
predits = modelDT.predict(testData.map(lambda x: x.features))
labelsAndPredicts = testData.map(lambda lp: lp.label).zip(predits)
rmsleDT = np.sqrt(labelsAndPredicts.map(lambda lp: squared_log_error(lp[0], lp[1])).mean())
return rmsleDT

Decision Tree model behavior on Max Bins tuning

constantMaxDepthAndBinsParams = [32, 64, 80, 100, 200, 400]metrics = [evaluate_dt(trainingData_dt, testData_dt, 5, param) # constant Max Depth
for param in constantMaxDepthAndBinsParams]
print(constantMaxDepthAndBinsParams)
print(metrics)

In the above code, called the evaluate_dt function defined above, which runs for various max bin values so that we can observe the behavior of the algorithm.

# Plotting : Decision Tree Max Bins
plot(constantMaxDepthAndBinsParams, metrics)
plt.xlabel('Max Bins')
plt.ylabel('RMSLE')

Same can be repeated for Decision Tree Max Depth.

In the Decision Tree algorithm, I have implemented the concept of Max depth and Max bin. The most important and interesting task is to build a plot to depict the behavior of the model by changing those parameters and looking at the error rate how well is the algorithm model performing.

Looking at the error rate is one of the ways to optimize and see what the best parameters would work for the given specific dataset and the algorithm. Decision Tree Log is the one shows the error rate values of various log functions described in the assignment such as Root Squared Error, Absolute Error and Mean Squared Error.

Gradient boost tree model

model_GBT = GradientBoostedTrees.trainRegressor(trainingData_dt,
categoricalFeaturesInfo={}, numIterations=3, maxDepth=5, maxBins=64)
predictions_GBT = model_GBT.predict(testData_dt.map(lambda x: x.features))
labelsAndPredictions_GBT = testData_dt.map(lambda lp: lp.label).zip(predictions_GBT)
print('Learned regression Gradient boosted tree lmodel:')
print(model_GBT.toDebugString())
mse_gbt = labelsAndPredictions_GBT.map(lambda lp: squared_error(lp[0], lp[1])).mean()
mae_gbt = labelsAndPredictions_GBT.map(lambda lp: abs_error(lp[0], lp[1])).mean()

print("GradientBoostedTrees Model - Mean Squared Error: %2.4f" % mse_gbt)
print("GradientBoostedTrees Model - Mean Absolute Error: %2.4f" % mae_gbt)

Performance tuning Gradient boost tree Max Depth

def evaluate_gbt(trainData, testData, numIterationsValue, maxDepthValue, maxBinsValue):
model_GBT = GradientBoostedTrees.trainRegressor(trainData,
categoricalFeaturesInfo={}, numIterations=numIterationsValue,
maxDepth=maxDepthValue, maxBins=maxBinsValue)
predictions_GBT = model_GBT.predict(testData.map(lambda x: x.features))
labelsAndPredictions_GBT = testData.map(lambda lp: lp.label).zip(predictions_GBT)
rmsleGBT = np.sqrt(labelsAndPredictions_GBT.map(lambda lp: squared_log_error(lp[0], lp[1])).mean())
return rmsleGBT

Below is the actual call to the evaluate_gbt so that we can observe the model behaviour for various inputs of max depth params

maxDepthParams = [4, 5, 6]metrics_gbt_maxDepth = [evaluate_gbt(trainingData_dt, testData_dt, 3, param, 32)
for param in maxDepthParams]
print(maxDepthParams)
print(metrics_gbt_maxDepth)
# Plotting
plot(maxDepthParams, metrics_gbt_maxDepth)
plt.xlabel('Max Depths')
plt.ylabel('RMSLE')

Linear regression model

A sample implementation of the Pyspark linear regression model

model_LR = LinearRegressionWithSGD.train(trainingData_linear, iterations=5, step=0.01)
# Building a Regression Model with Spark
true_vs_predicted_LR = testData_linear.map(lambda p: (p.label, model_LR.predict(p.features)))
print("Linear Model predictions: " + str(true_vs_predicted_LR.take(5)))
predictions_linear = model_LR.predict(testData_linear.map(lambda x: x.features))
labelsAndPredictions_linear = testData_linear.map(lambda lp: lp.label).zip(predictions_linear)

Performance tuning Linear regression Iterations

def evaluate(train, test, iterations, step, regParam, regType, intercept):
model = LinearRegressionWithSGD.train(train, iterations, step,
regParam=regParam, regType=regType, intercept=intercept)
tp = test.map(lambda p: (p.label, model.predict(p.features)))
rmsle = np.sqrt(tp.map(lambda lp: squared_log_error(lp[0], lp[1])).mean())
return rmsle

In the below code, triggered evaluate to find the model behavior for various iteration input values.

params_iterations = [1, 3, 6]
metrics_iterations = [evaluate(trainingData_linear, testData_linear, param, 0.01, 0.0, 'l2', False) for param in params_iterations]
print(params_iterations)
print(metrics_iterations)
# Plotting
plot(params_iterations, metrics_iterations)
plt.xlabel('Iterations')
plt.ylabel('RMSLE')

In all the 3 models, ultimate goal is to implement a evaluate function so that we can use it to test the behaviour of model by changing set of input values of each attribute of an model algorithm

--

--

Jayasagar

Engineering specialist working with companies and individuals to solve exciting problems with robust and powerful technology. https://techatcore.com