Leveraging Machine Learning Tasks with PySpark Pandas UDF

Vitor Hugo Medeiros De Luca
Neoway Labs | Tech
6 min readJun 23, 2022

--

Experimenting is the word that best defines the daily life of a Data Scientist. To build a decent machine learning model for a given problem, a Data Scientist needs to train several models. This process includes tasks such as finding optimal hyperparameters to the model, cross-validate models using K-fold, and sometimes even train a model that has several outputs. All of those tasks mentioned before are time-consuming and nonetheless extremely important for the success of the model development. In this blog post, we’re going to show how PySpark Pandas UDF, a framework used to distribute python functions on Spark clusters, can be applied to enhance the Data Scientist’s daily productivity.

How does PySpark implement Pandas UDF (User Defined Function)?

As the name suggests, PySpark Pandas UDF is a way to implement User-Defined Functions (UDFs) in PySpark using Pandas DataFrame. The definition given by the PySpark API documentation is the following:

“Pandas UDFs are user-defined functions that are executed by Spark using Arrow to transfer data and Pandas to work with the data, which allows vectorized operations. A Pandas UDF is defined using the pandas_udf as a decorator or to wrap the function, and no additional configuration is required. A Pandas UDF behaves as a regular PySpark function API in general.”

In this post, we are going to explore PandasUDFType.GROUPED_MAP, or in the latest versions of PySpark also known as pyspark.sql.GroupedData.applyInPandas. The main idea is straightforward, Pandas UDF grouped data allow operations in each group of the dataset. Since grouped operations in spark are computed across the nodes of the cluster, we can manipulate our dataset in a way that allows different models to be computed in different nodes. Yes, my dudes… never underestimate the power of a groupBy.

Setting Up

Before getting into the specifics of applying Pandas UDF, let’s set up the environment with some modules, global variables, and commonly used functions.

The first step is to import all the modules that are going to be used throughout this little experiment.

And set some global variables that are going to be used multiple times.

A common step in every task explored in this notebook is the training and evaluation of a machine learning model. This step is encapsulated in the following function, which trains and evaluates a CatBoost model based on its accuracy score.

To train and test our CatBoost model, we will also need some data. So let’s create our dataset using scikit-learn’s make_multilabel_classification function and build our PySpark DataFrame from it.

Number of rows in the dataset: 10000

Finally, for a more efficient Spark computation, we’re going to enable arrow-based columnar data transfer.

Distributed Grid Search

In machine learning, hyperparameters are parameters whose values are used to control the model’s architecture and its learning process. Oftentimes when training a model you need to optimize these hyperparameters but, despite the ability of ML to find optimal internal parameters and thresholds for decisions, hyperparameters are set manually.

If the search space contains too many possibilities, you’ll need to spend a good amount of time testing to find the best combination of hyperparameters. A way to accelerate this task is to distribute the search process on the nodes of a Spark cluster.

One question that arises with this approach is: “Ok, but I’m using an algorithm that hasn’t been implemented on Spark yet, how can I distribute this process with these limitations?” Don’t worry! That’s a question we are here to answer!

First, we have to define the hyperparameter search space. For that, we are going to create an auxiliary PySpark DataFrame where each row is a unique set of hyperparameters.

Number of different hyperparameter combinations: 24000

For each hyperparameter row, we want to replicate our data so we can later process every hyperparameter set individually.

Number of rows in the replicated dataset: 240000000

The last step is to specify how each Spark node will handle the data. To do that, we define the hyperparameter_search function. It basically extracts the hyperparameters and the data from the input Spark DataFrame, then trains and evaluates the model, returning its results.

We can now group the Spark Dataframe by the replication_id and apply the hyperparameter_search function. This way, every hyperparameter combination will be used to train a different model in a distributed system.

With this distributed approach, we were able to run 24000 combinations of hyperparameters in only 29 minutes.

Distributed K-Fold Cross-Validation

Having an optimal set of hyperparameters, another important task is to perform a K-Fold Cross-Validation of your model to prevent (or minimize) the undesired effects of overfitting. The more folds you add to this experiment the more robust your model will be. However, you’ll have to spend more time training models for each fold. Once again, a way to avoid the time trap is to use Spark and compute each fold in an individual node of your Spark cluster.

We perform this in a very similar manner to how we distribute the grid-search, the difference being that we replicate our dataset according to the number of folds. So if our cross-validation uses 8 folds, our dataset will be replicated 8 times.

Here, our first step is to define the number of folds we want to cross-validate our model. Following this, we define some code to randomly split our dataset according to the number of folds defined above.

After the split, we replicate the dataset K times.

Number of rows in the replicated dataset: 80000

Here we have another difference compared to the grid search approach. In the function below, we define the train and test datasets according to a replication_id and a fold_id. If the replication_id is equal to the fold_id, we set that fold as the test fold while the rest of the folds are used as the training set.

One thing that you might have to take into account with this approach is how to save each trained model since each model is trained in a different node. To do this, depending on your cloud provider, you can use some python library developed to transfer files from the cluster nodes directly to a cloud bucket (like Google Cloud Storage or Amazon S3). However, if you’re only interested in the performance of the cross-validated model, the function above is enough.

In this experiment, we evaluated 8 folds (one in each node of the cluster) in only 35 seconds. And the best fold (number 4) reached an accuracy score of 0.900.

Distributed Multiple Output Model

Following the same philosophy, we can take advantage of PySpark Pandas UDF to distribute the training of multi-output models. For this task, we have a set of features and a set of labels, where we must train a model for each label with the same training data.

Some packages like scikit-learn have already implemented this approach to Random Forest algorithms. CatBoost also has the option of training with multi-outputs. However, these implementations have limited hyperparameter and loss-function options compared to the single output API. Considering this, Pandas UDF is an alternative to automate the training of multiple models at once, using all the options that any other machine learning library usually offers to single output model training.

Since our dataset has multiple label columns, the approach this time is to pivot our data in a way we can replicate the data for each specific label. So we create a column to map each label and append all the labels in one single label column as shown below:

Number of rows in the dataset: 100000

Having defined our spark multi-output dataset we are ready to define the function to perform the model training.

Once everything is set, you can call the groupBy method on the y_group column to distribute the training of each model.

Conclusion

In this post, we showed some examples of how PySpark Pandas UDF can be used to distribute processes involving the training of machine learning models. Some of the approaches showed can be used to save time or to run experiments on a larger scale that would otherwise be too memory-intensive or prohibitively expensive.

We hope this can be useful and that you guys enjoyed the content. Please leave your comments and questions below.

See you in the next post. That’s all, folks.

Authors: Igor Siqueira Cortez, Vitor Hugo Medeiros De Luca, Luiz Felipe Manke

We are always looking for passionate professionals to be part of our team

Be part of one of the best companies to work for in Brazil! Check out our open positions.

--

--