How to write a custom Spark Classifier: Categorical Naive Bayes

Máximo Gurméndez
dataxu.technology
Published in
6 min readApr 23, 2018

Last week, we took a look at a few reasons why you would want to write your own Spark Classifier/Estimator. And now that you’ve decided to move forward, let’s take a look at how to write your own Spark Classifier by going over the implementation of a flavor of Naive Bayes for binary classification over categorical features. As opposed to the standard Spark implementation, this version does not need additional stages for feature encoding into numerical vectors, making it much faster to train.

dataxu bids on ads in real-time on behalf of its customers, and trains on past bids to optimize for future bids. Our system trains thousands of advertiser-specific models and has multi-terabyte datasets. Hence, we need to consider both model performance and cost of training. We always choose the model with the highest performance, but for a given performance level, we need to pick the classifier that minimizes training time (and prediction time, but that’s a topic for another blog post).

For our case, the custom implementation we’ll describe, which we’ll call Categorical Naive Bayes, has similar model performance (using ROC AUC as metric) to the standard Naive Bayes implementation that comes with Spark. However, to achieve that parity, we had to add some additional stages of feature engineering to Spark’s Naive Bayes, such as numerical encoding from categorical features, and attribute selection. This slows down the trainer as it implies several more passes through the dataset.

Full disclosure, we didn’t end up using this new implementation in production, and chose mostly tree-based algorithms due to model performance reasons. That said, Categorical Naive Bayes is 3X faster than Spark Naive Bayes and 10X faster than Spark Random Forest Bayes for our real time bidding training data, in which most features are categorical.

Recap on Spark Pipeline Concepts:

A Spark ML pipeline consists of a series of stages. Each stage can be a Transformer or an Estimator. Transformers apply a well defined transformation on a dataset while Estimators have the added capability of producing Models by traversing the dataset. NaiveBayes and StringIndexer are example of Estimators while VectorAssembler and OneHotEncoder are examples of Transformers. Models in turn are transformers because they can provide predictions for all elements in a dataset as a transformation.

Comparing Spark Naive Bayes to Categorical Naive Bayes

Spark Naive Bayes assumes a feature layout mostly used in language models (e.g. multiple Bernoulli or Multinomial models) where the value of a feature indicates the presence or strength of that term (column) within the document (row). Hence, one needs to use techniques such as one-hot-encoding to run a Naive Bayes algorithm on top of categorical features, and some sort of feature selection to avoid having too many features. A typical pipeline would be:

Note that we have two string indexers in this sample pipeline. At the time of this writing, Spark is not smart enough to realize that those two stages could be parallelized and, thus, we pay the cost of traversing the data twice before we submit it to the classifier. In order to avoid this, our implementation computes the conditional probabilities for all categories in a single pass. These conditional probabilities are sufficient to make predictions on a dataset.

For each categorical value of a feature, we need to compute the conditional probability by counting the number of positive and negative instances. As you can imagine, features with extremely high cardinality might blow up the memory of the executors. For this reason we use the stream-lib library which calculates the top-k elements and their frequency on a stream of data, following the ideas from Metwally, Agrawal, and Amr Abbadi (Efficient computation of frequent and top-k elements in data streams).

A typical usage of CategoricalNaiveBayes is:

Note that it just needs the feature names as input as well as the max number of category values that we can keep in memory for every feature. All is done with a single stage.

Using top-k here might be a good or bad idea depending on how the features are distributed on your dataset. We found that it typically works great, but users should do the proper feature exploration before applying it. For example, if instances with positive labels occur across a long tail of infrequent values (larger than k), it’s possible the performance of the classifier is degraded.

Implementation of Categorical Naive Bayes

To implement an Estimator in Spark, you need to extend the Estimator trait and implement the fit() function to return a Model. This model should have as data structures everything that is needed to calculate predictions on a dataset (in our case, the conditional probabilities).

To implement the fit() function, we traverse the dataset once and use Spark Accumulators to compute the conditional probabilities for each category value. These accumulators have a StreamSummary as underlying data structure that counts the frequencies of the top-k elements on a stream. The fit function returns a Categorical Naive Bayes Model which we initialize by providing the smoothed conditional probabilities using the simplest Laplace smoothing.

To implement the Categorical Naive Bayes Model we can extend Spark’s class ProbabilisticClassificationModel which implements all the necessary methods, including the transform() method. You need to specify how to compute predictions given one row of data. We do this by overriding the methods predictRaw(Seq[String]) and raw2probabilityInPlace(Vector). This first method is responsible for computing a raw score for a row of data. Basically, it returns the unnormalized probability using the standard Naive Bayes calculation and the log-sum-exp trick. The second method, raw2probabilityInPlace, just normalizes the probabilities.

How to use the Categorical Naive Bayes

We provide the source code for Categorical Naive Bayes on our dataxu github repository. To use it on your application, you can just build the jar, attach it to your Spark application, and import the class. To build the Categorical Naive Bayes from source, you can clone the repository on your computer:

https://github.com/dataxu/spark-ext

Then build the sources with Apache Maven by running the console command:

mvn clean install

This creates a jar file in the spark-ext/target directory called spark-ext-0.0.1.jar (or similar).

To get started, you can use the Spark Shell to run a toy example. First download this sample trainer.scala file from gist

This trainer file will create a dataset on-the-fly, train a model, and show the predictions. To invoke it, run the spark-shell by providing the location of the jar file and the trainer.scala script file.

Writing your own Spark classifier

It’s fairly simple to write your own Spark Classifier if you know your way around the Spark ML classes and scala’s type system. Naive Bayes is one of the simplest algorithms to implement and we provide such code for exploration, fixes, and enhancements. Also, those interested in using it for categorical features might find it handy. Writing your own Spark classifier may be useful for when you have a very specific algorithm or when you can relax some constraints of a generic algorithm to make it more performant by some criteria.

Want to learn more about why you might want to write your own Spark Classifier? Check out the first blog in this series here.

Please post your feedback in the comments — have you written a custom Spark estimator? What was your method? If you found this post useful, please feel free to “applause” and share!

--

--

Máximo Gurméndez
dataxu.technology

Data Science Engineering Lead @dataxu / Founder @montevideolabs