Handling missing values with Snowpark for Python — Part 2

Photo by Markus Winkler on Unsplash

This is the second post in my two part serie on handling missing values with Snowpark for Python.

In my previous post, I covered what missing values is and how they are represented, how to handle them and examples on how Snowpark for Python can be used.

What I did not cover in that post was how to calculate the replacement value from one data set and then use that on another data set, which from a machine learning perspective is a rather important requirement, and that is the focus for this post.

Doing this is a two step process, the first step is when we learn from a dataset what the replacement values should be, which is called to fit, and the second step is when we apply those replacement values on a dataset, which is called to transform.

Scikit-Learn has a module, impute, that has transformers that can be used for this. The drawback with it, is that fit and transform needs to be done in-memory on the client running it and require us to pull data from Snowflake and convert it to a format that can be used.

At the time of writing this blog post, Snowpark for Python does not have similar impute functions out of the box, but since we are using Python this can be rather easily done.

Creating a imputer class using Snowpark for Python

I am going to create a Snowpark for Python imputer that will work in a similar way as the SimpleImputer in Scikit-Learn.

For this example I will follow how estimators/transformers are implemented in Scickit-Learn to a certain extent, focusing on the fit and transform method.

Designing the imputer class

The SimpleImputer in Scikit-Learn supports replacing missing values with mean, median, most frequent or with a constant value. It uses the same method, called strategy, for all columns in the provided dataset when calling the fit and transform methods.

The strategy is defined when creating the object, it also has a parameter, missing_values, that tells how missing values are defined in the dataset, and if strategy==“constant” then fill_value needs to be set as well.

For this example I will focus on implementing the mean, median and most frequent strategies, that means my imputer will only have the strategy parameter and an input_cols parameter to specify which columns in a dataframe to use for fit and transform.

The fit and transform methods needs to accept a Snowpark DataFrame, since that is what we are working with.

Below shows an example on how I would like to be able to use my imputer.

my_si = SimpleImputer(strategy='mean', input_columns=['A', 'B'])
my_si.fit(my_df)
imputed_df = my_si.transform(my_df)

With this I am all set to create the class.

Helper functions

I am going to start with creating a couple of helper functions for various tasks. First one, _verify_numeric_cols, checks that all provided columns are of numeric data type, which is needed for mean and median.

from snowflake.snowpark import DataFrame
from snowflake.snowpark import functions as F
from snowflake.snowpark import types as T
import json
def _verify_numeric_cols(cols, df_col_types):
col_datatypes = (type(col_type) for col_type in df_col_types.values())
numeric_types = (T.DecimalType, T.LongType, T.DoubleType, T.FloatType, T.IntegerType)

if len(list(set(col_datatypes).difference(numeric_types))) > 0:
raise TypeError(
"Only numeric columns can be used with chosen strategy (mean or median)"
)
return True

_exclude_nan is used to make sure that when using mean, median or mode (most frequent) NaN values are not included when the data type of the column is either Float or Double. In those cases the function returns a iff statement that replace NaN with Null, which is ignored by the mean, median and mode functions.

def _exclude_nan(col, col_type):
if isinstance(col_type, (T.FloatType, T.DoubleType)):
return F.iff(F.col(col) == F.lit('NaN'), F.lit(None), F.col(col))
else:
return F.col(col)

The last helper function is _check_columns that is used to verify that the columns provided through the input_cols exists in the Snowpark DataFrame, if no columns is provided it will use all columns in the DataFrame. The function also creates a dictionary with the data types for each column that is used to make sure we do not have data types that is not supported by the chosen strategy for example.

def _check_columns(df, cols=None):
# Get the columns in the df
df_cols = df.columns
if cols is None:
# get all columns for the dataframe
cols = df_cols
else:
if not isinstance(cols, list):
cols = [cols]
# Verify that cols does not have columns that is not in df
if len(list(set(cols).difference(df_cols))) > 0:
raise ValueError(
"input_cols contains columns not avalible in the provided DataFrame"
)

df_col_type_dict = {field.name: field.datatype for field in df.schema.fields if field.name in cols}
return df_col_type_dict, cols

Creating the imputer class

The imputer is created as a class called SimpleImputer, the __init__ method is used to capture the parameters that is needed, strategy that tells what type of imputation to do (mean, median or most frequent) and input_cols that is a list of column names the strategy is going to be used on.

class SimpleImputer:
def __init__(
self,
*,
strategy = "mean",
input_cols = None,
):
self.strategy = strategy
self.input_cols = input_cols

The next part is to define the fit method, that is used to calculate the values based on the strategy. It has one parameter, df, that is the Snowpark DataFrame used for doing the calculations on.

It creates a list, fit_col_logic, that has combinations of column_name and func(column_name) where func is depended on the strategy. By using object_construct with the list as input, an object will be created by Snowflake as below.

{
"COLUMN1": value from func call,
"COLUMN2": value from func call,
}

The object is pulled back using collect, then converted into a dict and stored in fitted_values_ so it can be used during transform.

    def fit(self, df: DataFrame):
# Check that input columns exists in the provided dataframe,
# generate a dict with column names and their datatypes
df_col_type_dict, fit_columns = _check_columns(df, self.input_cols)

self.input_cols = fit_columns

strategy = self.strategy

# most_frequent is the only strategy that can be
# used on numeric and character columns
if strategy != "most_frequent":
# If not most_frequent then verify we only
# have numeric columns
_verify_numeric_cols(fit_columns, df_col_type_dict)

# Set the snowpark function to use based on strategy
if strategy == "mean":
func = F.mean
elif strategy == "median":
func = F.median
elif strategy == "most_frequent":
func = F.mode

# Generate the logic to return a result that has a
# dictionary with the input columns and the fitted values:
# {"colum1":value, "column2":value}
fit_col_logic = []
for col in fit_columns:
fit_col_logic.extend([F.lit(col), func(_exclude_nan(col, df_col_type_dict[col]))])

df_fitted = df.select(F.object_construct(*fit_col_logic))
# Pull back the dictionary with the fitted values
fitted_values = json.loads(df_fitted.collect()[0][0])
# Store the fitted values as part of the object
self.fitted_values_ = fitted_values
return self

The transform method is when the missing values are replaced with the values gathered in the fit method. Since the fit method creates a dict with column name and replacement value the fillna method can be used with the dict as the parameter.

    def transform(self, df: DataFrame) -> DataFrame:
fitted_values = self.fitted_values_
# Since the fitted_values is
# {"colum1":value, "column2":value} we can
# use fillna for doing the transform
trans_df = df.fillna(fitted_values)
return trans_df

I can now use my new imputer class with Snowpark DataFrames.

>>> df = session.create_dataframe([[1.0, 1, 'SE'], [float('nan'), 2, None], [float('nan'), 3, 'DK'], [4.0, None, 'SE'], [float('nan'), None, None]]).to_df("a", "b", "c")
>>> my_si = SimpleImputer(strategy="most_frequent")
>>> my_si.fit(df)
>>> my_si.transform(df).show()
-------------------
|"A" |"B" |"C" |
-------------------
|1.0 |1 |SE |
|1.0 |2 |SE |
|1.0 |3 |DK |
|4.0 |1 |SE |
|1.0 |1 |SE |
-------------------

Since the imputer object is a Python object I can for example save it to disc using joblib or pickle and then load it in an another environment, for example my production to be used on new data.

Conclusion

So, even if Snowpark for Python does not have a module with imputers it is very easy to build that type of functionality using Python and Snowpark for Python.

With this approach it is easy to add functionality that enables the computation to be pushed down to Snowflake leveraging the elastic compute engine there, making it possible to apply this on huge datasets and also minimise data movement.

If you are interested to see how for example Data Preprocessing, scaling and encoding, can be done using Snowpark for Python have a look at the sp4py_utilities project.

--

--