Exploring Pyspark.ml for Machine Learning: Custom Hyperparameter Tuning

Sze Zhong LIM
Data And Beyond
Published in
7 min readOct 21, 2023

Introduction: What is hyperparameter tuning?

Hyperparameter tuning is a critical step in the machine learning model development process. Hyperparameters are parameters that govern the learning process itself, influencing the behavior and performance of the model. Unlike model parameters, which are learned from the training data (e.g., weights in neural networks), hyperparameters must be set prior to training and must be optimized to achieve the best model performance.

Hyperparameter tuning involves searching for the optimal combination of hyperparameters that maximize a chosen evaluation metric, such as accuracy, precision, recall, or F1 score. Techniques like grid search, random search, and Bayesian optimization are commonly used to explore the hyperparameter space and find the best configuration for the model.

Hyperparameter Tuning in PySpark.ml

While both PySpark.ml and scikit-learn, a popular Python library for machine learning, offer capabilities for hyperparameter tuning, they differ in their approaches and advantages. In this section, we will explore the strengths of hyperparameter tuning in PySpark.ml over scikit-learn, along with the associated weaknesses.

Strengths of Hyperparameter Tuning in PySpark.ml

From Codeacademy
  1. Distributed Computing Advantage: PySpark.ml leverages the distributed computing capabilities of Apache Spark, allowing for parallelized hyperparameter tuning. This means that multiple combinations of hyperparameters can be evaluated simultaneously across a cluster of machines, drastically reducing the overall tuning time. On the other hand, scikit-learn’s tuning, being single-machine based, might be slower for extensive hyperparameter search, especially with large datasets.
  2. Seamless Integration with Big Data: PySpark.ml seamlessly integrates with other big data tools and libraries within the Hadoop ecosystem. This integration is advantageous when dealing with massive datasets, enabling efficient processing and tuning on large-scale data. scikit-learn, being more suitable for traditional, smaller-scale machine learning tasks, lacks this level of integration with big data tools.
  3. Built for Scalability: PySpark.ml is designed for scalable machine learning. As a result, it is well-suited for handling large volumes of data and can handle the associated hyperparameter tuning at scale. Conversely, scikit-learn’s tuning may become impractical for very large datasets due to its limitations in distributed computing.

Weaknesses of Hyperparameter Tuning in PySpark.ml

  1. Learning Curve and Complexity: Transitioning to PySpark.ml for hyperparameter tuning may be challenging, especially for those familiar with scikit-learn. The distributed nature of PySpark requires users to understand distributed computing concepts, potentially resulting in a steeper learning curve.
  2. Limited Hyperparameter Search Strategies: PySpark.ml primarily supports grid search and random search strategies for hyperparameter tuning. In contrast, scikit-learn offers a broader array of hyperparameter search strategies, including more advanced techniques like Bayesian optimization, which are not readily available within the standard PySpark.ml package.
  3. Lack of Control over Train Validation Split: The cross-validation function automatically splits the dataset into training and validation sets. However, there’s no built-in provision to allow users to define their own training and validation datasets, which could be important in certain experimental setups.
  4. Limited Metric Options: The current implementation of the cross-validation function in PySpark (pyspark.ml.tuning.CrossValidator) accepts an evaluator object for model evaluation during cross-validation. However, it does not allow for customization of evaluation metrics beyond what the evaluator object provides.

Custom Hyperparameter Tuning in PySpark.ml

I am currently using Pyspark 2.4. Although it is slightly outdated, there are many enterprises which are still using it due to legacy system connectivity. I will also assume that you have an existing pyspark.sql.dataframe.

I will create functions that can:

  1. Provide control over Train Validation Split
  2. Provide control over your defined metrics
  3. Record results such that you can analyze it.

We will make a function that fully defines all the possible hyperparameter combinations.

import itertools

def generate_configurations(params):
"""
Generate all possible configurations given a dictionary of hyperparameters.

Parameters:
params (dict): Dictionary containing hyperparameter names and their respective values.

Returns:
list: List of dictionaries, each representing a unique configuration.
"""
keys = params.keys()
values = params.values()
configurations = [dict(zip(keys, config)) for config in itertools.product(*values)]
return configurations

# Sample hyperparameter configurations
hyperparameters = {
'learning_rate': [0.01, 0.1, 0.2],
'num_hidden_units': [32, 64, 128],
'batch_size': [16, 32, 64]
}

# Generate configurations
configurations = generate_configurations(hyperparameters)

# Print the generated configurations
for config in configurations:
print(config)

After that we will provide a function that trains the model. We need a VectorAssembler to assemble all the features into a vector as that is the way the code works. The featurelist will contain the list of features you want the model to train based on. Do also remember to update the target/label column name.

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier

def trainmodel(traindf, featurelist, paramdict=None):
# Vector Assemble all the features into one vector column
a = VectorAssembler(inputCols=featurelist, outputCol='feature_col')
df = a.transform(traindf)

# Create Random Forest Classifier
# 'label' to be replaced with the target y - value column name within traindf.
rf = RandomForestClassifier(featuresCol='feature_col', labelCol='label')

if paramdict == None:
pass
else:
for param_name, param_value in paramdict.items():
try:
rf.set(rf.getParam(param_name), param_value)
# There will be Attribute error if param_name is not present in getParam()
except AttributeError:
print(f"Invalid Parameter: {param_name}")

rf_model = rf.fit(df)

return rf_model

The testmodel is similar but with a few changes. We will also put in a threshold control. Threshold control is available on later version of pyspark but not available for earlier versions. You may modify the code accordingly.

def testmodel(testdf, featurelist, modelused, threshold=0.5):
# Vector Assemble all the features into one vector column
a = VectorAssembler(inputCols=featurelist, outputCol='feature_col')
df = a.transform(testdf)

rf_model = modelused
predicted_df = rf_model.transform(df)

if threshold==0.5:
pass # Most thresholds are set by default at 0.5.
else:
predicted_df = changethreshold(predicted_df, threshold)

return predicted_df

The function for the threshold change is as below. Basically I created two helper columns and deleted them after i have re-set my threshold level. This function is used for binary classification and you might need to tweak around with the settings. Do take note that there might be other more efficient code for the vector_to_array. I am using a udf as this function was not available in pyspark 2.4. However, it is to my understanding that the latest version of pyspark has a similar offering that is much faster.

from pyspark.sql.types import ArrayType, DoubleType
from pyspark.sql import functions as F

def changethreshold(predicted_df, threshold)
# Remove the vector and replace the vector with the standardized column names.
udf_vector_to_array = F.udf(lambda vector: vector.toArray().tolist(), ArrayType(DoubleType()))

df = predicted_df.withColumn('probability_array', udf_vector_to_array('probability'))
df = df.withColumn('probability_1', F.col('probability_array')[1])
df = df.withColumn('prediction', F.when(F.col('probability_1')>=threshold, 1.0).otherwise(0.0))

col_to_drop = ['probability_1', 'probability_array']
df = df.drop(*col_to_drop)

return df

Next i create the evaluator that I want to use. In this particular case, I am concerned with the sensitivity/recall metric. You may modify this evaluator to anything that you see fit and it can even be areaunderROC or areaunderPR. One thing to note is that, this function is more friendly as it allows you to tweak your own settings. In the pre-existing functions, sometimes the 1 and 0 classifications are opposite, leading to the wrong kind of metric.

# df refers to the predicted_df output by the testmodel()
# label refers to the actual target.
# 'prediction' column can be changed to your own predicted column.

def recall(df,label):
trueneg = df.filter((F.col(label)==0) & (F.col('prediction')==0)).count()
truepos = df.filter((F.col(label)==1) & (F.col('prediction')==1)).count()
falsepos = df.filter((F.col(label)==0) & (F.col('prediction')==1)).count()
falseneg = df.filter((F.col(label)==1) & (F.col('prediction')==0)).count()

recall = truepos / (truepos + falseneg)

return recall

The next function will be a function to record the findings into a csv file. We will be putting out findings into a dictionary, and appending it into a list. So this function needs to take a list, split it out into dictionaries as rows and record it. The function does not need to return anything.

import csv

def save_to_csv(data, file_path):
# Assume all dictionaries have the same keys
keys = data[0].keys()

with open(file_path, 'w', newline='') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=keys)
writer.writeheader()
for item in data:
writer.writerow(item)

The last function is the compiler. It will take in the training dataset to fit a model, and run it on a validation dataset and the test dataset. For the recall function, do change it to the actual label function name.

def compiler(traindf, validdf, testdf, featurelist, paramlist,\
csv_path_name, templistforresult, thresholdused=0.5):

for x in range(len(paramlist)):
train_model = trainmodel(traindf, featurelist, paramdict=paramlist[x])
valid_df = testmodel(validdf, featurelist, train_model, threshold=thresholdused)
test_df = testmodel(testdf, featurelist, train_model, threshold=thresholdused)

# Here you can put in as many metrics as you want.
# Replace label with the actual label column name.
valid_recall = recall(valid_df,label)
test_recall = recall(test_df,label)

# Create a dictionary to store your results.
# You need to modify this part if you modify the above metrics.
tempdict = {'param':paramlist[x],
'valid_recall': valid_recall,
'test_recall': test_recall}

templistforresult.append(tempdict)
save_to_csv(templistforresult, csv_path_name)

return "All Done"

If you noticed, i saved the whole list everytime to the csv file. This is so that at any point of time, you may stop the programme and the csv file shall be fully up to date on your latest iteration.

Assuming you have your train / validation / test datasets up and running on spark, you may run the code below to start the hyperparameter tuning.

# Set the hyperparameters

hyperparameter = {
'maxDepth':[5,10,15,20],
'numTrees':[20,30,40,50],
'impurity':['gini','entropy'],
'maxBins':[32,64],
'seed': [220]}

hplist = generate_configurations(hyperparameter)
print(hplist) # To see how many total models you will be running

listforhpt = [] # Create an empty list to store the results
path_name = 'Random_Forest_HPT.csv'

# You need to fill up the first row of input arguments based on your own code.
compiler(traindf, validdf, testdf, featurelist, paramlist,\
path_name, listforhpt, thresholdused=0.5)

From this, you should be able to get a csv file of the results of the hyperparameter tuning. From there, you may run it into a Pandas Dataframe and analyze to see which parameters suit the best. Despite this method being less convenient than just using the scikit-learn or pyspark.ml package, it does provide more flexibility as to what we want, which most of the time is more important, as a data scientist.

--

--