Snowpark for Data Science Playbook, FAQ style — Part 1

This playbook will walk you through the data science best practices on snowpark in a frequently asked questions(FAQs) style from our snowflake community. There are so many references available to build your own notebook for data science in snowpark and the intent behind this playbook is to simplify your understanding on how to build machine learning models in snowpark for various use cases.

Data science workflow provides a quick reference to refine your understanding on how to build machine learning models in snowpark starting from set up, data ingestion, data engineering, feature engineering, build model pipeline, train models , deploy model pipeline, monitor model performance and finally infer using deployed model.

Snowpark for Data Science ML Model development flow
Snowpark for Data Science ML Model development flow

What notebook IDEs are supported by snowpark?

Snowpark supports many notebook IDEs such as Jupyter Notebook, Spyder, HEX. If you want a rich experience building solutions in snowpark, we highly recommend HEX which has both SQL and Scripting languages kernels available. It can also help you visualize from SQL or scripting languages flamboyantly. Try it yourself here

How do I know what packages are supported by snowpark for data science?

You can easily get the packages that are available in snowpark by

Method 1: Scanning through this list for all snowflake snowpark supported packages for python

https://repo.anaconda.com/pkgs/snowflake/

sample libraries list that are supported in snowflake snowpark for python (check above link for exhaustive list)

Method 2: Execute the following SQL in your snowflake account accessing the Information Schema. Any package that is installed as part of default anaconda installation such as scikit-learn should be available.

select package_name, language, max(version)
from "YOUR_DB"."INFORMATION_SCHEMA"."PACKAGES"
WHERE PACKAGE_NAME LIKE '%scikit%'
group by package_name, language
order by 1,2;
Shows sci-kit learn associated packages with recent version available in snowpark

How do I access new packages (not listed above) in snowpark for machine learning use case?

Step 1 : Download the whl or tar.gz file of the package that you want from pypi and other distribution.

Step 2 : Unzip and upload the tar.gz file to internal stage where the model files are going to be saved

Step 3 : add the session.add_import(import_path) to the function where the package is accessed by the model

Remember that you can only add/include pure-Python packages and not those that have or depend on C extensions.

How Do I check if my snowpark installation is set up correctly for Data Science?

If you have already installed anaconda through navigator and installed snowpark then try

from snowflake.snowpark.session import Session
print (f"snowflake snowpark version is: {version.VERSION}")

And you should see the following output or something similar

snowflake snowpark version is: (0, 10, 0)

Then, all it takes is importing the following basic snowpark libraries. Snowpark.functions and types are integral to snowpark primarily used for data/feature engineering activities within snowpark.

from snowflake.snowpark.functions import avg, sum, col,lit
from snowflake.snowpark.functions import udf, sproc, col
from snowflake.snowpark.types import IntegerType, FloatType, LongType,
DoubleType, DecimalType,StringType, BooleanType, Variant
from snowflake.snowpark.types import PandasSeries, PandasDataFrame
from snowflake.snowpark import functions as fn
from snowflake.snowpark import version

If the above throws error then you have not installed snowpark correctly, use this snowpark QuickStart for installation guide.

Or if you are a starter, Install snowpark with these simple Steps,

1. install anaconda for macOS or windows from https://www.anaconda.com/products/distribution

2. After anaconda is fully installed, open anaconda navigator to create a new environment and call it snowpark

3. Once the environment is set up, click the play button to activate the environment

4. Then click home on the left pane. In the home screen, select channel “snowpark” environment which you created

5. Finally click launch under jupyterlab to launch the notebook for model building. If this is your first time, then click install under jupyterlab before you click launch.

6. Go back and run the snowpark libraries to check if the install was successful

from snowflake.snowpark.functions import avg, sum, col,lit
from snowflake.snowpark.functions import udf, sproc, col
from snowflake.snowpark.types import IntegerType, FloatType, LongType,
DoubleType, DecimalType,StringType, BooleanType, Variant
from snowflake.snowpark.types import PandasSeries, PandasDataFrame
from snowflake.snowpark import functions as fn
from snowflake.snowpark import version
  • If you have conda installed and rather want to use miniconda instead and Python 3.8 version. Please follow these Steps to install.
conda create --name snowpark -c https://repo.anaconda.com/pkgs/snowflake python=3.8
conda activate snowpark
conda install -c https://repo.anaconda.com/pkgs/snowflake snowflake-snowpark-python pandas notebook

Remember, currently snowpark support python 3.8.

How Do I connect to snowflake for Data Science from the notebook?

Step 1 Build credentials cred.json using the following snowflake parameters

{
"account": "your_snowflake_account_name",
"user": "userid",
"password": "password",
"database": "database",
"schema": "schema",
"warehouse": "warehouse"
}

Step 2 : Now using the above cred.json, create a new snowpark session by connecting to snowflake

snowflake_connection_cfg = open('cred.json')
snowflake_connection_cfg = snowflake_connection_cfg.read()
snowflake_connection_cfg = json.loads(snowflake_connection_cfg)
# Creating Snowpark Session
session = Session.builder.configs(snowflake_connection_cfg).create()

Can I validate my connection to snowflake?

Validate using following session calls to verify

print('Current Database:', session.get_current_database())
print('Current Schema:', session.get_current_schema())
print('Current Warehouse:', session.get_current_warehouse())

You should get the following output with the database, schema and warehouse name you used in cred.json if you are able to connect

Current Database: "database"
Current Schema: "schema"
Current Warehouse: "warehouse"

Can I have multiple active sessions simultaneously?

Yes, you can create and activate multiple sessions (snowflake connections) simultaneously similar to snowflake worksheets. Make sure you are NOT mixing sessions across notebooks.

How do I access data the best way in snowpark if my data is stored as files?

The best way to access your adhoc files is to ingest your data to snowflake as a table(permanent, transient, temporary).. If you have files stored in your local drive, you can load the data using snowsql. SnowSQL quick-start to install snowsql and get started . PUT command will work only with snowsql on command prompt and not on snowflake worksheet.

Other ways to execute the PUT command is on your notebook cell as explained in step 2 below.

Step 1: Create a stage location using the snowflake session created above

session.sql("CREATE OR REPLACE STAGE STAGE_DATA").collect()

You should see the following output

[Row(status='STAGE_DATA created successfully, statement succeeded.')]

Step 2: Let’s try to load the file from a local folder to the internal stage created above using put command

With SQL call

session.sql("put file:///Users/usename/Doc/file.csv 
@stage_data").collect();

With Snowpark Dataframe

session.file.put('file.csv', 'schema.TABLE_NAME')

Step 3 : Execute list on internal stage

session.sql("list @stage_data").collect();

It should return file loaded successfully.

Row(name='stage_data/file.csv.gz', size=1664, 
md5='54d6a4fb52738250c895b3e18b47df36',
last_modified='Mon, 7 Nov 2022 19:33:43 GMT')]

Note: snowflake compresses by default and you can choose NOT to compress by adding COMPRESSION=NONE.

Step 4 : Now let’s ingest the data in stage to a snowflake but before that review the structure of the input file before you create the table in snowflake

With SQL based table creation

session.sql("CREATE OR REPLACE table TABLE_NAME(ID VARCHAR,
COLUMN1 VARCHAR,COLUMN2 NUMBER,COLUMN3 NUMBER,COLUMN4 NUMBER,COLUMN5 NUMBER,
COLUMN6 NUMBER,COLUMN7 NUMBER,COLUMN8 NUMBER,COLUMN9 NUMBER,
COLUMN10 NUMBER,COLUMN11 NUMBER,COLUMN12 NUMBER)").collect()

Returns

[Row(status='Table SOFTWARE_PROJECTS successfully created.')]

With Snowpark Dataframe

schema_log=T.StructType([T.StructField("DATE", T.TimestampType()),
T.StructField("COLUMN1", T.StringType()),
T.StructField("MODEL_NAME", T.StringType()),
T.StructField("COLUMN2", T.StringType()),
T.StructField("CLASSIFICATION_REPORT", T.VariantType())])
log_df = session.create_dataframe([],schema=schema_log)
log_df.write.mode('overwrite').\
save_as_table('DATABASE.SCHEMA.TABLE_NAME')
#Just use the TABLE_NAME as the database and schema is already set in context

Step 5 : Finally let’s load the data to snowflake table you created

With SQL

session.sql("copy into table_name from @stage_data/file.csv.gz 
ON_ERROR = CONTINUE").collect()

Returns

[Row(file='stage_data/file.csv.gz', status='PARTIALLY_LOADED', 
rows_parsed=82, rows_loaded=81, error_limit=82, errors_seen=1,
first_error="Numeric value 'COLUMN1' is not recognized",
first_error_line=1, first_error_character=13,
first_error_column_name='"TABLE"["COLUMN1":3]')]

With Snowpark Dataframe

load_df = session.read\
.option("FIELD_DELIMITER", ',')\
.option("SKIP_HEADER", 1)\
.option("ON_ERROR", "CONTINUE")\
.schema(schema_something).csv(file_name)\
.copy_into_table(table_name)

If you loaded the file in a pandas dataframe you can also create table in snowflake without having to define a table structure.

session.create_dataframe(df).write.mode("overwrite").\
save_as_table("table_name")

Step 6 : Let’s check some stats on the table loaded before we conclude on data load.

snowpark_df = session.table(TABLE_NAME)
# Describing the data
print('Rows in dataset:', f"{snowpark_df.count():,}")
print('Data before imputation:')
display(snowpark_df.describe().sort('SUMMARY').toPandas())

How do I profile data the best way to understand the variables?

The most popular profiling package available is Pandas-profiling . You can achieve something similar by using snowpark dataframe reference such as

Snowpark Dataframe Stats

Snowpark Dataframe find missing values

Snowpark Dataframe count

For other references , use Snowpark API reference for python

I will soon share a notebook that does similar to pandas-profiling with streamlit. There is work in progress.

How do I find correlations and covariance between X variables and Y in snowpark?

Yes

Correlation using following

>>> df = session.create_dataframe([[0.1, 0.5], [0.2, 0.6], [0.3, 0.7]], 
schema=["a", "b"])
>>> df.stat.corr("a", "b")
0.9999999999999991

Snowpark reference for Correlation

Covariance using following

>>> df = session.create_dataframe([[0.1, 0.5], [0.2, 0.6], [0.3, 0.7]], 
schema=["a", "b"])
>>> df.stat.cov("a", "b")
0.010000000000000037

Snowpark reference for Covariance

Thereafter, You can build a correlation and covariance matrices to visualize using HEX or matplotlib. I will soon share a notebook that does similar to sklearn to build covariance matrix using streamlit. There is work in progress.

How do I impute, scale or normalize the X variables in a snowpark?

This can be achieved by building a model pipeline where you can impute, normalize, scale and define model classifiers with parameters.

def build_rf_model(p_df: pd.DataFrame,ne,nj,cw, md):
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestClassifier
#Identify numeric features to impute
numeric_features = p_df.select_dtypes(include=['int64', 'float64']).\
columns.tolist()
#Identify categorical features to impute
categorical_features = p_df.select_dtypes(include=['object']).columns.\
tolist()
feature_names = numeric_features + categorical_features
#Apply impute and scaling for numeric features
numeric_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='mean')),
('scaler', StandardScaler(with_mean=True,with_std=True))])
#Apply impute and encoding for categorical features
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='constant', fill_value='missing')),
('onehot', OneHotEncoder(handle_unknown='ignore'))])
#Apply preprocessor for categorical and numeric features
preprocessor = ColumnTransformer(
transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features)])
#Build the model pipeline for preprocessor and model
model = Pipeline(steps=[
('preprocessor', preprocessor),
('classifier',RandomForestClassifier(n_estimators=ne,
n_jobs=-nj,class_weight=cw,
max_depth=md)
)])
return model

Can we train models in snowpark and use snowflake credits?

You can train ML models in snowflake by registering the model pipeline function built above as a stored procedure and then calling the stored procedure. Refer to the diagram if you feel confused.

This will utilize the snowflake compute power and credits.

Step 1: Define the Model Function

def train_rf_model(session: Session, training_table: str, model_name: str,
features:list, Y: str,test_size:float,random_state:int,ne:int,nj:int,cw:str,
md:int) -> str:
from sklearn.metrics import accuracy_score, classification_report,
precision_score, recall_score, confusion_matrix
training_data = session.table(training_table)
# Split train and test
Data_train, Data_test = training_data.random_split([1-test_size, test_size], seed=random_state)
pd_Data_train=Data_train.to_pandas()
pd_Data_test=Data_test.to_pandas()
from sklearn.ensemble import RandomForestClassifier
# Model built in previous step
rf = build_rf_model(pd_Data_train[features],ne,nj,cw, md)
rf.fit(pd_Data_train[features], pd_Data_train[Y])
# Save Model file in internal stage
model_dir = '@stage_models'
model_fl = model_name+'.joblib'
save_file(session, rf, model_dir ,model_fl)
#Score the model
score = rf.score(pd_Data_test[features], pd_Data_test[Y])
y_pred = rf.predict(pd_Data_test)
#Evaluate model metrics
df_classification_report = get_classification_report(y_pred,pd_Data_test[Y]).reset_index().rename(columns={"index": "class"}).reset_index(drop=True)
df_model_info = get_model_info(model_fl,test_size,random_state,ne,nj,cw,md)
df_model_info=df_model_info.append([df_model_info]*5,ignore_index=True)
#Write to snowflake table
session.create_dataframe(df_classification_report.join(df_model_info)).write.mode("append").save_as_table("model_output")
return df_classification_report.join(df_model_info)

Step 2: Register Stored Proc

rf_sproc = session.sproc.register(func=train_rf_model,
name = 'train_rf_model',
is_permanent=True,
replace=True,
stage_location='@stage_models',
packages=['snowflake-snowpark-python',
'scikit-learn','joblib'])

Step 3: Execute Stored Proc for training

table_name = 'table_name'
model_name = 'rf_model_iteration1.joblib'
features = ['column1','column2','column3','column4']
target = 'LABEL'
test_size = 0.25
random_state = 43
n_estimator = 4
n_jobs = 1
class_weight = 'balanced'
max_depth = 15
print(rf_sproc(table_name,
Model_name,
features,
'LABEL',
test_size,
random_state,
n_estimator,
n_jobs,
class_weight,
max_depth)

How do I confirm if the training was pushed down to snowflake?

After you execute the notebook cell which calls the stored procedure (above),

Step 1 : login to the snowflake instance you are using for the notebook session

Step 2 : Click on “History tab” and click refresh on right side.

Step 3 : You should see a new query submitted in snowflake where the stored procedure will be running and query id will be assigned.

Snowflake history tab to validate model training from stored proc

The history tab shows the query id, the stored procedure CALL, Warehouse, size and most importantly the duration it took to complete the training.

You can also narrow down all your training using stored proc and the duration it took with

select query_id,
substr(query_text,55,20) model,
warehouse_name,
execution_status,
TIMESTAMPDIFF(second,start_time,end_time) tot_dur_insecs
from table(information_schema.QUERY_HISTORY_BY_WAREHOUSE('warehouse_name'))
WHERE query_text like '%CALL%' and execution_status = 'SUCCESS'
order by start_time desc'''

And you get the output as this

|QUERY_ID         |MODEL               |WAREHOUSE_NAME|STATUS   |TOT_DUR_INSECS  |
----------------------------------------------------------------------------------------------------------------------------------
|0057-3c030165c1f6|M', 'logres_staples_|WRHS |SUCCESS |245 |
|0057-3c030165b42a|'dtree_staples_model|WRHS |SUCCESS |167 |
|0057-3c030165c11e|'rf_staples_model', |WRHS |SUCCESS |230 |

Can I register individual pieces in the model pipeline as separate stored proc?

Yes, you can build an independent piece from a model pipeline such as the scaler shown below and register it as a stored procedure which will be pushed down to utilize snowflake compute resources and credits.

Step 1: define the function min_max_scaler and establish the input table and columns for scaling. Most importantly you “preprocessing” is a helper function that are built-in functions for imputation, encoding and scaling modules. This package needs to be imported before the min_max_scaler is registered as stored proc. The “preprocessing” utility scripts link here . Here is how to use preprocessing utility and registering as individual stored procedure for reusability across.

import io
import joblib
def save_file(session, model, path):
input_stream = io.BytesIO()
joblib.dump(model, input_stream)
session._conn._cursor.upload_stream(input_stream, path)
return "successfully created file: " + path

def min_max_scaler(session: Session, input_table: str, input_cols: list,
output_table: str, output_cols: list) -> str:
import preprocessing as pp
df_input = session.table(input_table)
mms = pp.MinMaxScaler(input_cols=input_cols, output_cols=output_cols)
mms.fit(df_input)
mms_tr_df = mms.transform(df_input)
save_file(session, mms, "@model_STAGE/min_max_scaler.joblib")
mms_tr_df.write.mode("overwrite").save_as_table(output_table)
return "SUCCESS"

Step 2: register the function as stored proc

session.clear_imports()
session.clear_packages()
session.add_import("preprocessing")
session.add_packages('snowflake-snowpark-python', 'joblib', 'scipy', 'numpy')
from snowflake.snowpark import functions as F
min_max_scaler_sp = F.sproc(min_max_scaler, replace=True, is_permanent=False,
session=session)​

Step 3: Call the stored procedure by passing the input

min_max_scaler_sp("scaler_input_table", ["input1", "input2"], 
"scaler_output", ["input1_scaled", "input2_scaled"])

Similarly you can build independent pieces from model pipelines such as encoding, correlation analysis, imputation etc as independent stored procedures so that you can reuse those stored procedures for different use cases.

Can I train multiple models in snowpark simultaneously?

Yes, you can train multiple models and there are multiple options to get there.

Option 1 : by registering multiple stored procedures and executing them independently which generates different model files. Use timestamp as suffix to get unique model filenames.

Option 2 : by registering one stored proc as above and then execute with different parameters based on your model pipeline definition and saving different model files.

table_name = 'table_name'
features = ['column1','column2','column3','column4']
target = 'LABEL'
test_size = 0.25
random_state = 43
n_estimator = 4
n_jobs = 1
class_weight = 'balanced'

### Model 1
model_name = 'rf_model_iteration_md15.joblib'
max_depth = 15
print(rf_sproc(table_name,
Model_name,
features,
'LABEL',
test_size,
random_state,
n_estimator,
n_jobs,
class_weight,
max_depth)

### Model 2
model_name = 'rf_model_iteration_md25.joblib'
max_depth = 25
print(rf_sproc(table_name,
Model_name,
features,
'LABEL',
test_size,
random_state,
n_estimator,
n_jobs,
class_weight,
max_depth)

Can I increase compute power while the model training is in progress?

No, after the training has begun, the query is submitted to snowflake and warehouse that was assigned earlier will be used to train the model. You can change the warehouse anytime but this will not take effect on the model training which is already in progress.

Can I increase compute power before I train the model?

You can always cancel the current execution and change the warehouse to a new size. Once the warehouse has been resized, you can execute the stored proc for model training.

If you want to resize the warehouse you currently are using.

session.sql("alter warehouse APP_WH SET WAREHOUSE_SIZE='MEDIUM';

If you want to use a different warehouse that is already configured with medium size then

session.sql("use warehouse medium_wrhs");

Can I evaluate trained models in snowpark?

Yes, By defining a function that can write to snowflake. Here is an example of deriving classification report as follows and calling in the model definition above. If you are building other kinds of model, you should be able to build a different function. This works only for classification use case.

def get_classification_report(y_test, y_pred):
from sklearn import metrics
report = metrics.classification_report(y_test, y_pred,output_dict=True,
target_names=['0','1'])
df_classification_report = pd.DataFrame(report).transpose()
return df_classification_report

To save the model parameters as a snowflake table, you can define a function that captures all you input to the model and save it in the model output.

def get_model_info(model_name, test_size, random_state,ne,nj,cw,max_depth):
data = [[model_name,test_size,random_state,ne,nj,cw,max_depth]]
df_model_info = pd.DataFrame(data,columns=['model','test_size',
'random_state','nestimator','njobs',
'classweight','max_depth'])
return df_model_info

How do I save the trained model ?

Before we use the model for inference, the trained model should be saved in snowflake internal stage where all the model dependencies are saved.

Step 1 : Define the function to save the model

def save_file_to_stage(session, obj, stage, name):
import io
import joblib
import os
model_output_dir = '/tmp'
model_file = os.path.join(model_output_dir, name)
joblib.dump(obj, model_file)
session.file.put(model_file, stage,overwrite=True, auto_compress = False )

Step 2 : Execute the function by passing the stage directory where you want the model saved

model_dir = '@stage_models'
model_fl = model_name+'.joblib'
save_file(session, rf, model_dir ,model_fl)

Step 3 : Now after the training is completed, check the saved model file as below and see the results

session.sql("list @stage_models").collect()

Returns, this shows the size of the model file that was saved in internal stage

[Row(name='_stage_models/dtree_model.joblib', size=999872, 
md5='029bac10a06ebbfc98c1544536553750',
last_modified='Thu, 8 Dec 2022 05:41:15 GMT'),
Row(name='_stage_models/rf_model.joblib', size=4588736,
md5='37750de293d2a9e5e42371010cb2f3e2',
last_modified='Thu, 8 Dec 2022 05:32:27 GMT')]

How do I go back and check all the models I have trained in snowflake?

Now that we did all the heavy lifting ahead, by building the pipeline and training the model, It is very simple to evaluate all the models you trained using SQL queries. But first remember, you should have saved the model output in a snowflake table by defining the table input parameters and then model reports part of the stored procedure.

Step 1 : Remember to create a model_output snowflake table

session.sql("create or replace table model_output 
(class varchar, precision double, recall double, f1score double,
support double, model varchar,test_size float, random_state int,
nestimator int, njobs int, classweight varchar, max_depth int)").collect()

Step 2: Define these 2 function calls while registering stored proc above. This should write the model output to the snowflake table MODEL_OUTPUT.

### Call the get_classification_report function and write to pandas dataframe
df_classification_report = get_classification_report(y_pred,pd_Data_test[Y]).\
reset_index().rename(columns={"index": "class"}).reset_index(drop=True)

### Call the get_model_info function and write to pandas dataframe
df_model_info = get_model_info(model_fl,test_size,random_state,ne,nj,cw,md)

### Merge both these dataframes
df_model_info=df_model_info.append([df_model_info]*5,ignore_index=True)

### Write the model evaluation metrics and input parameters to snowflake table
session.create_dataframe(df_classification_report.join(df_model_info)).\
write.mode("append").save_as_table("MODEL_OUTPUT")

Step 3: Use this SQL to query model output.

select class,precision,recall,f1score,model 
from model_output order by f1score desc

Now we can check the model output here where you can query like regular tables and build visualizations for model performance.

CLASS         |PRECISION|RECALL    |F1SCORE   |MODEL                 
-------------------------------------------------------------------------------------------------------------
0 |0.80053 |0.722841 |0.759705 |logres_model.joblib
weighted avg |0.72790 |0.716395 |0.719575 |logres_model.joblib
accuracy |0.71639 |0.716395 |0.716395 |logres_model.joblib
macro avg |0.70491 |0.714355 |0.706872 |logres_model.joblib
0 |0.66581 |0.750339 |0.705554 |rf_model.joblib
0 |0.65679 |0.747357 |0.699154 |dtree_model.joblib
accuracy |0.68862 |0.688626 |0.688626 |rf_model.joblib
macro avg |0.69175 |0.688971 |0.687593 |rf_model.joblib
weighted avg |0.69190 |0.688626 |0.687492 |rf_model.joblib
accuracy |0.68336 |0.683368 |0.683368 |dtree_model.joblib
macro avg |0.68700 |0.684339 |0.682494 |dtree_model.joblib
weighted avg |0.68747 |0.683368 |0.682237 |dtree_model.joblib
1 |0.71769 |0.627603 |0.669633 |rf_model.joblib
1 |0.71721 |0.621321 |0.665833 |dtree_model.joblib
1 |0.60930 |0.705868 |0.654040 |logres_model.joblib

There is a streamlit app that is currently being worked out which will be available to evaluate trained models.

How do I perform inference on a model that was trained and saved?

Step 1 Define the inference function

import sys
import cachetools
import os
from snowflake.snowpark.functions import udf
session.add_import("@stage_models/rf_model.joblib")
@cachetools.cached(cache={})
def read_file(filename):
import_dir = sys._xoptions.get("snowflake_import_directory")
if import_dir:
with open(os.path.join(import_dir, filename), 'rb') as file:
m = joblib.load(file)
return m

Step 2 Register the model inference

@udf(name="predict_rf_model", is_permanent=True, 
stage_location="@_stage_models", replace=True)
def predict(features1: float, features2: float, features3: float,
features4: float) -> float:
m = read_file('rf_model.joblib')
row = pd.DataFrame([locals()], columns=features)
return m.predict(row)[0]

Step 3 Execute the inference udf through simple SQL statements

qry="""SELECT
Identifier,
LABEL AS ORIGINAL_LABEL,
predict_rf_model(feature1, feature2, feature3,feature4) as PRED_rf_LABEL,
predict_dtree_model(feature1, feature2, feature3,feature4) as PRED_dtree_LABEL
FROM data_test"""
limitrecords = 1000
predict_snowpark_df = session.sql(qry)
predict_snowpark_df.show(limitrecords)

And the results will look something like this when you call the UDFs (user defined function) to predict the labels using input features calling 2 different UDFs (random forest and Decision Tree Classifiers) simultaneously.

Can I use the inference outside of snowflake?

Yes, you can pretty much use the inference udf from any app, microservice, lambda function or analytics tools such as tableau, powerBI etc.

You can use streamlit to host an app within snowflake and share it across user base. More to come in this space in my next parts.

What are some of the clean ups available in snowpark?

To clear the packages or imports

session.clear_packages()
session.clear_imports()

To close a session that was activated for snowpark

session.close()
print('Finished!!!')

What are some readily available notebooks available to get started?

From our snowpark python demos repository, please use these projects to get familiarized with snowpark and use the above playbook to decode your understanding. These are just some starter notebooks for your understanding.

Regression Models to predict housing prices

Classification Model for credit card fraud detection

Customer Churn Analytics

Credit Scoring Model

Advertisement Spend ROI prediction

Sentiment Analytics

Advertisement ROI Prediction

What are you waiting for? Get started with snowpark. Share your snowpark experience in your blogspot and I will be very happy to learn from your experience.

Happy holiday everyone and I wish you all a very happy new year 2023.

--

--

Karuna Nadadur
Snowflake Builders Blog: Data Engineers, App Developers, AI/ML, & Data Science

Karuna is a Sr Data Cloud Architect at snowflake with rich experience in data analytics and data science to make insightful decisions for your business.