SkiiDate with Sklearn pipeline and Tensorflow on Snowpark!

Hello Everyone, wishing you all a very happy and prosperous new year ahead. I am sure you are familiarizing with Snowpark and hope that Snowpark for DataScience playbook was helpful to get you all jump started.

Obviously Snowpark supports plethora of libraries for python but often our customers enquire if Snowpark supports tensorflow and can you show me how it works.

This blog will show you step-by-step guide on how to build and train keras model (tensorflow) within sklearn pipeline using keras wrappers for scikit_learn. All of this is going to be assembled and trained within Snowpark by pushing down to snowflake.

This model will leverage the snowpark-optimized warehouse as such tensorflow model trainings are generally considered to be memory intensive based on your model definition especially on high volume data .

Now let me walk you through how I went on to build, train and infer this machine learning model on Snowpark.

1. Define the Snowpark libraries and connect to snowflake

# Import required libraries
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import avg, sum, col,lit
from snowflake.snowpark.functions import udf, sproc, col
from snowflake.snowpark.types import IntegerType, FloatType,
LongType, DoubleType, DecimalType,StringType, BooleanType,
Variant
from snowflake.snowpark.types import PandasSeries,
PandasDataFrame
from snowflake.snowpark import functions as fn

import warnings
warnings.filterwarnings('ignore')
import sys ,json
import io
import logging
import pandas as pd

import joblib
import pandas as pd
import numpy as np
import json

from snowflake.snowpark import version
print (f"snowflake snowpark version is: {version.VERSION}")

2. Establish connection to snowflake from the notebook using the credentials and define a session.

snowflake_connection_cfg = open('cred.json')
snowflake_connection_cfg = snowflake_connection_cfg.read()
snowflake_connection_cfg = json.loads(snowflake_connection_cfg)

# Creating Snowpark Session
sk_tf_session = Session.builder.configs(snowflake_connection_cfg).create()
print('Current Database:', sk_tf_session.get_current_database())
print('Current Schema:', sk_tf_session.get_current_schema())
print('Current Warehouse:', sk_tf_session.get_current_warehouse())
print("Warehouse set up:")
sk_tf_session.sql("show warehouses like 'xsmall_wh'").collect()

3. Now using this session, add packages. Here we are using scikit-learn and tensorflow as major packages which are available within Snowpark. I am also using dill to save the model file instead of joblib as I found some integrity errors which went away when i used dill.

sk_tf_session.clear_packages()
sk_tf_session.add_packages("snowflake-snowpark-python")
sk_tf_session.add_packages("scikit-learn","pandas","numpy","joblib",
"cachetools","tensorflow","dill")
sk_tf_session.clear_imports()

4. Create a model stage library

# sk_tf_session.sql("CREATE OR REPLACE STAGE sp_tf_models").collect()

5. Create a model output table to persist all trainings performed

sk_tf_session.sql("create or replace table tf_model_output (class varchar, 
precision double, recall double, f1score double, support double, model varchar,
test_size float, random_state int, epochs int, batchsize int)").collect()

6. Define the input features

features=["X1","X2","X3","X4","X5","X6","X7","X8","X9"]

7. Build the tensorflow model using sklearn pipeline. Most importantly please notice that, I am using sklearn to encode, impute and scale the training data. Thereafter, I am defining a tensorflow model function kerasclassifiermodel, and then using the tensorflow.keras.wrappers.scikit_learn import kerasclassifier I am integrating the tensorflow model within the sklearn pipeline. Similarly, if you are building a regression model then you can use kerasestimator instead of kerasclassifier.

def build_tf_model(p_df: pd.DataFrame):
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Conv2D , MaxPool2D , Flatten ,
Dropout , BatchNormalization
from tensorflow.keras import datasets, layers, models
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import ReduceLROnPlateau
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras.wrappers.scikit_learn import KerasClassifier
numeric_features = p_df.select_dtypes(include=['int64', 'float64']).
columns.tolist()
categorical_features = p_df.select_dtypes(include=['object']).
columns.tolist()

feature_names = numeric_features + categorical_features

numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='mean')),
('scaler', StandardScaler(with_mean=True,with_std=True))])

categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('onehot', OneHotEncoder(handle_unknown='ignore'))])

preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)])
# Define the keras classifier that will be called in the
# model sklearn pipeline using the keras wrapper for
# scikitlearn. For classification you would use
# KerasClassifier and for regression you would Kerasestimator
def kerasclassifiermodel():
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Conv2D , MaxPool2D ,
Flatten , Dropout , BatchNormalization

clf = Sequential()
clf.add(Dense(units = 10 , activation = 'relu'))
clf.add(Dropout(0.2))
clf.add(Dense(units = 1 , activation = 'sigmoid')) # Tanh
clf.compile(loss='binary_crossentropy',
optimizer='rmsprop',
metrics=["accuracy"])
return clf

model = Pipeline(steps=[
('preprocessor', preprocessor),
('classifier'
,KerasClassifier(kerasclassifiermodel)
# Call the function defined above
)
])

return model

8. Define the classification report as we want to record the metrics and we use sklearn metrics for this again.

def get_classification_report(y_test, y_pred):
from sklearn import metrics
report = metrics.classification_report(y_test, y_pred, output_dict=True,
target_names=['0','1'])
df_classification_report = pd.DataFrame(report).transpose()
return df_classification_report

9. Define the model parameters that will be used to train.

def get_model_info(model_name, test_size, random_state,epochs,batchsize):
data = [[model_name,test_size,random_state,epochs,
batchsize]]
df_model_info = pd.DataFrame(data,columns=['model',
'test_size','random_state','epochs','batchsize'])
return df_model_info

10. Define save model files function. If you notice here, we need to save both the pipeline and the keras model as separate model files. When we infer, we will use the sklearn pipeline .pkl as the main model file but the keras .h5 file will become its dependency. So the .h5 file will be imported from stage. Also if you notice, I am using dill to save the pipeline model file as I ran into issues using joblib. Thanks to stackoverflow and Mats Stellwall for his suggestions on how to save the models in Snowpark for this scenario.

def save_pipeline_tf_file(session, model ,classifier,modeldir, modelh5file, 
pipelinefile):
import joblib
import dill
import os

# collect the preprocessing pipeline & model seperately
model_file = os.path.join('/tmp', modelh5file)
classifier.model.save(model_file)
session.file.put(model_file, modeldir,overwrite=True)

# Upload pipeline to a stage
pipeline_file = os.path.join('/tmp', pipelinefile)
input_stream = io.BytesIO()
# using dill instead of joblib because dill is better at
# handling any integrity issues that may arise due to
# keras wrapper in the pipeline.
dill.dump(model, open(pipeline_file, "wb"))
session.file.put(pipeline_file, modeldir,overwrite=True)
return f"successfully uploaded model file: {modelh5file}
and pipeline file : {pipelinefile}"

11. Now comes the main function that combines all the ingredients we defined above. Most importantly, since tensorflow is memory intensive, it is a good idea to define “earlystopping” if the model training starts to decline. So here, I have defined to exit after 10 continuous declines based on loss. There are many ways to define early stopping, I am just trying to keep it simple.

One aspect you need to notice is, the classifier can be called out by using model.named_step[‘classifier’] and that becomes a model by itself. This is how we are able to save the keras model independently.

One more thing I had to improvise because I am using sklearn pipeline is, when you try to define parameters for keras classifier you have to prefix the parameter such as “epoch” into “classifier__epoch” and for “batch_size” as “classifier__batch_size” etc. Thanks to stackoverflow again.

def sp_train_tf_model(session: Session, training_table: str, 
model_name: str,features:list, Y: str,test_size:float,
random_state:int,epochs:int,batchsize:int) -> str:
from sklearn.metrics import accuracy_score,
classification_report, precision_score, recall_score,
confusion_matrix, RocCurveDisplay
from tensorflow.keras.callbacks import EarlyStopping

# Load training data
training_data = session.table(training_table)

# Train test split
Data_train, Data_test = training_data.random_split([1-test_size, test_size],
seed=random_state)
pd_Data_train=Data_train.to_pandas()
pd_Data_test=Data_test.to_pandas()

# Model building
tf = build_tf_model(pd_Data_train[features])

# define early stopping and will exit when the patience
# level has reached.
early_stopping = EarlyStopping()
custom_early_stopping = [
EarlyStopping(
monitor='loss',
patience=10,
mode='auto'
)]

# Model training using the pipeline built above, remember
# you have to prefix with classifier__ for every model
# parameter you like to use.
tf.fit(pd_Data_train[features], pd_Data_train[Y],
classifier__epochs=epochs,
classifier__batch_size=batchsize,
classifier__verbose=1,
classifier__callbacks=[custom_early_stopping])

# Upload model to stage, here you try to save both model
# files where the keras model is saved as .h5 and
# the pipeline is saved as pickle .pkl file.
save_pipeline_tf_file(session,tf,
tf.named_steps['classifier'], "@sp_tf_models",
model_name+"keras_model.h5",
model_name+"sklearn_pipeline.pkl")
# also make sure you pay attention that tf is the model
# and tf.named_steps['classifier'] is the classifier
# specific to keras

# Score Model
score = tf.score(pd_Data_test[features], pd_Data_test[Y])

y_pred = tf.predict(pd_Data_test)

#Evaluate Metrics
df_classification_report = get_classification_report(
y_pred,pd_Data_test[Y]).reset_index().
rename(columns={"index": "class"}).
reset_index(drop=True)
df_model_info = get_model_info('keras_model_1M.h5',
test_size,random_state,epochs,batchsize)
df_model_info=df_model_info.append([df_model_info]*5,
ignore_index=True)
session.create_dataframe(df_classification_report.
join(df_model_info)).
write.mode("append").
save_as_table("TF_MODEL_OUTPUT")

print (f"model score on validation data: {score}")
return df_classification_report.join(df_model_info)

12. Let’s register the stored proc as usual and import all the required packages such as scikit-learn, tensorflow, dill etc

# Registering the function as a Stored Procedure
sp_tf_sproc = sk_tf_session.sproc.register(func=sp_train_tf_model,
# training function defined above
name='sp_train_tf_model',
# training model name to be
# registered in snowlake
is_permanent=True,
# permanent stored proc
replace=True,
# replace if existing already
stage_location='@sp_tf_models',
# save the model in stage location
packages=['snowflake-snowpark-python',
'scikit-learn',
'joblib','dill',
'tensorflow'])
# import model libaries

12. The model has been registered as stored proc and as I said earlier, since tensorflow models are memory intensive and in this use case, I used around 1 million rows of training dataset so it is a good idea to unleash the snowpark-optimized warehouse size medium.

sk_tf_session.sql("use warehouse HMWH").collect()
print('Current Warehouse:', sk_tf_session.get_current_warehouse())
print("Warehouse set up:")
sk_tf_session.sql("show warehouses like 'HMWH'").collect()

13. let’s begin the training and wait for fireworks.

table_name = 'DATA_TRAIN_1M'
test_size = 0.25
model_name = 'data_train'
random_state = 43,
epochs=200,
batchsize=100
print ("Tensorflow classifier report")
print (sp_tf_2_sproc(table_name
, model_name
,features
,'LABEL'
,test_size
,random_state
,epochs
,batchsize))

Result

Tensorflow classifier report
class precision recall ... random_state epochs batchsize
0 0 0.835987 0.719037 ... 43 200 100
1 1 0.583333 0.736031 ... 43 200 100
2 accuracy 0.724956 0.724956 ... 43 200 100
3 macro avg 0.709660 0.727534 ... 43 200 100
4 weighted avg 0.747991 0.724956 ... 43 200 100

Fortunately, there are no fireworks as we are dealing with the stability of snowflake unless I had some bad coding involved. You can now see the classification report for the classes we defined in stored proc.

14. Let’s check if the models were saved in the stage location and as expected both model files were saved.

sk_tf_session.sql("list @sp_tf_models").collect()

Result

Row(name='sp_tf_models/data_train_keras_model_1M.h5.gz',
size=772000, md5='7ef4507a910b78f3b3b9a287d46039c8',
last_modified='Wed, 4 Jan 2023 04:23:31 GMT'),
Row(name='sp_tf_models/data_train_sklearn_pipe_1M.pkl.gz',
size=819632, md5='fe33abc4a99b6d4e019cc9aee5a34fab',
last_modified='Wed, 4 Jan 2023 04:23:33 GMT')

15. Henceforth, we don’t need the snowpark-optimized warehouse and luckily snowflake allows “auto suspend” and since we have it enabled, the warehouse will suspend automatically after some inactivity. Now let’s use xsmall warehouse.

sk_tf_session.sql("use warehouse xsmall_wh").collect()

16. Check the model output saved along side the model name in a snowflake table as defined in the stored proc

sk_tf_session.sql("""select class,precision,recall,f1score,model 
from tf_model_output """).show(1000)

Returns

---------------------------------------------------------------------------------------------------
|"CLASS" |"PRECISION" |"RECALL" |"F1SCORE" |"MODEL" |
---------------------------------------------------------------------------------------------------
|0 |0.8359867828176629 |0.7190373956532705 |0.7731143572427797 |keras_model_1M.h5 |
|1 |0.5833325730967477 |0.7360314938876994 |0.6508455944098652 |keras_model_1M.h5 |
|accuracy |0.724956200663104 |0.724956200663104 |0.724956200663104 |keras_model_1M.h5 |
|macro avg |0.7096596779572053 |0.727534444770485 |0.7119799758263224 |keras_model_1M.h5 |
|weighted avg |0.747990880850823 |0.724956200663104 |0.7305298696154544 |keras_model_1M.h5 |
---------------------------------------------------------------------------------------------------

17. Now that the model is saved in the stage location, let’s try to perform inference and for that we need to import the snowpark library udf and import the model files (.h5 and .pkl) we saved in stage location

import sys
import cachetools
import os
import joblib
from snowflake.snowpark.functions import udf
sk_tf_session.add_import("@sp_tf_models/data_train_keras_model_1M.h5")
sk_tf_session.add_import("@sp_tf_models/data_train_sklearn_pipe_1M.pkl")

18. Define the user defined function for inference and call it predict_tf_model.

@cachetools.cached(cache={})
def read_file(filename):
import_dir = sys._xoptions.get("snowflake_import_directory")
if import_dir:
with open(os.path.join(import_dir, filename), 'rb') as file:
m = joblib.load(file)
return m

@udf(name="predict_tf_model",
is_permanent=True,
stage_location="@sp_tf_models",
replace=True)
def predict(X1:float,X2:float,X3:float,X4:float,X5:float,
X6:float,X7: float,X8: float,X9: float) -> int:
m = read_file('data_train_sklearn_pipe_1M.pkl')
row = pd.DataFrame([locals()], columns=features)
return m.predict(row)[0]

19. Let’s put this model to infer on unseen data by passing the input parameter which will predict the labels. I am also calling some of the other udfs which I defined from other notebooks using randomforest classifier and decision tree classifier so that I can compare them to the kerasclassifier predictions side by side.

qry="""SELECT 
substr(Identifier,5,5) Identifier,
LABEL AS ORIGINAL_LABEL,
predict_tf_model(X1,X2,X3,X4,X5,X6,X7,X8,X9) as PRED_tf_LABEL,
predict_rf_model(X1,X2,X3,X4,X5,X6,X7,X8,X9) as PRED_rf_LABEL,
predict_dtree_model(X1,X2,X3,X4,X5,X6,X7,X8,X9) as PRED_dtree_LABEL
FROM (DATA_TEST)"""
predict_snowpark_df = sk_tf_session.sql(qry)
predict_snowpark_df.show(1000)

Returns :

------------------------------------------------------------------------------------------
|"Identifier"|"ORIGINAL_LABEL" |"PRED_TF_LABEL" |"PRED_RF_LABEL" |"PRED_DTREE_LABEL" |
------------------------------------------------------------------------------------------
|09295 |1 |1 |0.0 |0.0 |
|09340 |0 |0 |0.0 |0.0 |
|09624 |0 |0 |0.0 |0.0 |
|09649 |0 |0 |0.0 |0.0 |
|10008 |0 |0 |0.0 |0.0 |
|10092 |0 |0 |0.0 |0.0 |
|10418 |1 |1 |1.0 |1.0 |
|12242 |0 |0 |0.0 |0.0 |
|12623 |0 |0 |0.0 |0.0 |
|14138 |0 |0 |0.0 |0.0 |
|15150 |1 |1 |1.0 |1.0 |
|15180 |0 |0 |0.0 |0.0 |

Bingo, now we can successfully build, train and infer using the tensorflow model using sklearn pipeline.

Happy skiing at Snowpark.

--

--

Karuna Nadadur
Snowflake Builders Blog: Data Engineers, App Developers, AI/ML, & Data Science

Karuna is a Principal Data Cloud Architect at snowflake with rich experience in data analytics and data science to make insightful decisions for your business.