A journey on Scala ML pipeline — part 2 of 3: Custom transformers

Caio Ishizaka Costa
Red Ventures Brasil - Tech
7 min readJun 5, 2020
No, not that kind of transformer. Photo: https://www.syfy.pt/noticas/10-coisas-que-nao-sabias-sobre-a-saga-transformers

Welcome back. If you just landed at this article, you want to take a look at the part 1.

This is also available in scala notebooks in my git, with pretty much all the text explanation as well. Some code may have been omitted from this article (especially imported packages), the notebooks were tested in their entirety.

As promised, let’s make our first custom transformer. This will be very simple. You will pass which column you want to be normalised, the mean and the standard deviation to be considered, and the other params we are used to (handleInvalid and outputCol). We will use standard score, which is the easiest to implement and will achieve what we need in most cases. https://en.wikipedia.org/wiki/Standard_score

This will basically do

Z equals fraction, X minus mu on top, sigma on the bottom

Where µ is the mean and σ is the standard deviation of the population.

Transformer requirements

In order for your transformer to work in a pipeline it need a few things.

  1. transform() method, as it will be called by the pipeline. It has to return a Dataset/Dataframe
  2. transformSchema() method. I don’t quite understand why. My best guess is that it will assess the schema of the result beforehand, to understand if there are any conflicts in the pipeline before starting transforming (and figuring out in the last step it is broken because a column is missing)
  3. It has to extend Transformer. Probably not true, but this is definitely the easiest way to achieve a lot of the needs, extend the class you are mimicking anyway.
  4. It needs an Identifiable. Also not sure why, but it does. The code below should handle this. Also, initialising it with an uid.
def this() = this(Identifiable.randomUID("numNormalizer")) 

5. Copy method. I am just using the default one. Just remember it has to return an object of the same class you are implementing, in this case NumericNormalizer.

override def copy(extra: ParamMap): NumericNormalizer = defaultCopy(extra)

Lots of things, but the bulk of the work will be at (1), a little bit at (2) if necessary. And some work getting params right. Let’s go right to them.

How params work

I would be lying if I told you I know. I just went down a rabbit hole to make NAs work with XGBoost, ended up in the source code of Assemblers and Pipeline, and emerged with a way to do it.

What I can tell is, params are a nice way to store information, cannot be accessed from the outside (unless you have a method for that), and can be easily managed via methods.

What is the difference from an attribute? I honestly don’t know. They are different in nature and properties, but at a fundamental level I couldn’t tell you why use one or the other. I am just going with the flow, those transformers are usually implemented with Params, let’s mimic them.

Now for the real deal. We can extend many classes with those params, such as HasInputCol, HasOutputCol and HasHandleInvalid. This will create the params for us (and some methods as well), so it will be very similar to out of shelf StringIndexer and VectorAssembler, for instance.

We then start our class extending all of those.

class NumericNormalizer(override val uid: String) extends Transformer with HasInputCol with HasOutputCol with HasHandleInvalid

But we can create our own params from scratch.

We will create the params mean and stdDev. I decided to override handleInvalid, basically because it was giving me trouble. Very simple syntax, new Param(parent, name, documentation).

val mean: Param[Double] = new Param(this, "mean", """Mean of the column""")
val stdDev: Param[Double] = new Param(this, "stdDev", """Standard Deviation of the column""")
override val handleInvalid: Param[String] = new Param[String](this, "handleInvalid",
""" How to handle Null and Nan values
keep: will convert Null to NaN
skip: will filter out Null or NaN
""", ParamValidators.inArray(Array("keep","skip")))

After defining our parameters, we can set default values, if the user does not want to set anything in particular, and define the methods to allow users to define the parameters. Those methods have to return the same class you are implementing, and this.type is the most practical way to do it.

  setDefault(mean, 0.0)
setDefault(stdDev, 1.0)

def this() = this(Identifiable.randomUID("numNormalizer"))

override def copy(extra: ParamMap): NumericNormalizer = defaultCopy(extra)

def setOutputCol(value: String): this.type = set(outputCol, value)

def setInputCol(value: String): this.type = set(inputCol, value)

def setHandleInvalid(value: String): this.type = set(handleInvalid, value)

def setMean(value: Double): this.type = set(mean, value)

def setStdDev(value: Double): this.type = set(stdDev, value)

To access a param inside the class you use ${ParamName}. If you want, you may implement a getParam method, if you want to allow the user to check some parameter afterwards, using the get method. All these params methods comes from org.apache.spark.ml interface Params, and you can check all you can do in the documentation

Why don’t we calculate the mean and std dev ourselves

Great question. While 100% possible, it has a fundamental flaw. Mean and Standard Deviation are depending on the data. If your test data has a different distribution than your training data, values will be mapped differently. On the most extreme situation, when you pass a dataset with one element, you will not have any comparison to normalize.

By giving fixed numbers (based on your training set, whole set, whatever you want) you avoid that pitfall.

The transform

Now that we know all the requirements let’s go down to implementation. As you can see, our transform method does 2 things:

  1. Treats null/nan values accordingly to the handleInvalid parameter
  2. Applies the standard score, creating a new column in the process
override def transform(dataset: Dataset[_]): DataFrame = {
transformSchema(dataset.schema, logging = true)

var result = dataset
${handleInvalid} match {
case "keep" => result = result.withColumn(${outputCol}, when(col(${inputCol}).isNull, lit(Double.NaN)).otherwise(col(${inputCol}).cast("Double"))) // This will replace nulls by NaNs
case "skip" => result = result.withColumn(${outputCol},col(${inputCol})).filter(col(${inputCol}).isNotNull).filter(col(${inputCol}) === col(${inputCol})) //This will remove both Nulls and NaN
case _ => throw new SparkException("""Error mode not supported, sorry""")
}

return result.withColumn(${outputCol}, (col(${outputCol}) - ${mean})/ ${stdDev})
}

Not rocket science here. But this is beautiful. All the magic happens in this method, and the sky is the limit. Just make sure you don’t do anything that is not serialisable (crappy vague advice, I know, but I also don’t quite understand what is and what is not serialisable)

Quick note on replacing nulls by NaN on the “keep” method. This is will have great impact in Chapter 3. As of now, just do it.

The transformSchema

Key thing, call it on .transform(). Why? I don’t know, everyone does it, and if you don’t it will fail.

Now, this is super easy. You should be able to know what is going to be the resulting schema based on the dataset schema that is coming in. Just write it on words. In scala language.

override def transformSchema(schema: StructType): StructType = {StructType(schema.fields :+ new StructField(${outputCol}, DoubleType, true))}

Potential issue: If the user chooses the outputCol as an existing column, this is probably going to fail. It just occurred me now, and I’m too lazy to test. Try it yourself, and if necessary, put a check there.

NumericNormalizer in action

Let’s see if this is going to work.

import org.apache.spark.sql.functions.{mean, stddev}val age_mean = data.agg(mean("age")).head().getDouble(0)
val age_stdDev = data.agg(stddev("age")).head().getDouble(0)
val ageNormalizer = new NumericNormalizer()
.setInputCol("age")
.setOutputCol("age_norm")
.setHandleInvalid("keep")
.setMean(age_mean)
.setStdDev(age_stdDev)
display(ageNormalizer.transform(data).select("age","age_norm"))
Table with two columns, age and age_norm. Value pairs (age and age_norm) are like: 39 and 0.03, 50 and 0.83, null and NaN

Looks like success to me. Null being mapped to NaN. Normalized values around 0. Now let’s create normalizer for all numerical data, and put in the pipeline. We will create a map of mean and standard deviations as well, to makes things clearer. Also, I have a feeling if you just make the calculation inside the normalizer setMean and setStdDev params, it will calculate it every time the transformer is called (not confirmed, it is just a guess)

Again, you have an option to use only training data or entire data to calculate the mean and standard deviation. I prefer to use the entire data to avoid any skew/bias the sampling may have caused. Though unlikely, it may happen. Also, I am confident that just knowing the mean and standard deviation of the entire set won’t cause any bias/overfitting on the model itself.

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.feature.{VectorAssembler}
val mean_map = numerical_columns.map(column => column -> data.agg(mean(column)).head().getDouble(0)).toMap
val stdDev_map = numerical_columns.map(column => column -> data.filter(col(column) < 500*mean_map(column)).agg(stddev(column)).head().getDouble(0)).toMap
val normalizerArray = numerical_columns.map(column_name => {
new NumericNormalizer()
.setInputCol(column_name)
.setOutputCol(s"${column_name}_norm")
.setHandleInvalid("keep")
.setMean(mean_map(column_name))
.setStdDev(stdDev_map(column_name))
}).toArray
val normalized_assembler = new VectorAssembler().setInputCols((index_columns.map(name => s"${name}_index") ++ numerical_columns.map(name => s"${name}_norm")).toArray) .setOutputCol("features") .setHandleInvalid("skip")val normalized_stages = indexerArray ++ normalizerArray ++ Array(labelIndexer, normalized_assembler, xgbClassifier, labelConverter)
val normalized_pipeline = new Pipeline().setStages(normalized_stages)
val normalized_model = normalized_pipeline.fit(training)

After testing (check previous article or notebook in repo for the code), we got pretty much the same AUC.

auROC: Double = 0.9259361494824455

But that was not the point. You learned how to create a transformer (not the giant robot cars unfortunately), isn’t that cool? You can use it to anything. Removing outliers, grouping categorical values, making fun feature interaction, applying a function (log, exp, x², sin, …) to a feature/label, making sense of some text. You can do anything, you are a powerful data scientist/engineer with the right tools in hand.

A quick note on normalising. Though close to useless on XGBoost tree model, it is very important on other models, such as linear and logistic regression, specially if you are using regularisation. Regularisation is nothing but keeping the coefficients small to avoid overfitting. If your data is not normalised, your coefficients may vary in magnitude. It is very simple to realize that. Imagine you have a model:

Y equals Beta zero plus Beta one X 1 plus Beta two X 2

It is very easy to see that if you multiply all X_1 by 100, the resulting model is going to be:

Y equals Beta 0 plus Beta 1 X 1 plus Beta two over one hundred X two

Now if you regularise that, guess which coefficient is going to suffer more?Normalising deals with that and greatly improve the efficacy of regularisation methods.

Back to part 1

To part 3

--

--