How to build and deploy Scalable ML Models in Snowflake Using Snowpark, Streamlit and Python

A simple and effective way to build ML Models in Snowflake!

Naman Gupta
7 min readDec 19, 2023
Photo by Bailey Zindel on Unsplash

🌍 Who should read this article?

This article is for data enthusiasts, ML practitioners, and technical business leaders seeking to streamline their Machine Learning operations within Snowflake.

🎯 Problem Statement:

You are given an input dataset consisting of 10 attributes and your job is to come up with a machine learning model that will predict the median house value (price).

This is a Regression problem, where you have to predict a continuous number.

🚀 In this article you will going to learn:

  1. How to ingest data from local CSV file into the Snowflake table.
    (This table will act as our input dataset, therefore we will be using this table for data pre-processing and model building)
  2. How to do data pre-processing and model training using scikit-learn.
    (We will use RandomForestRegressor algorithm)
  3. After model training is done, how to output the model serialized file.
    (In our case it will be a joblib file but it can also be a pickle file based on the requirement)
  4. How to perform model inference or prediction serving using UDF.
    (Here, we will read the above serialized output file and come up with the prediction)
  5. How to build Streamlit based data app inside Snowflake for model inference.
    (Here, we will use Snowflake’s Snowsight editor to create a simple prediction serving data app)

Prerequisites

To implement the solution provided in this article, you should have an Snowflake account and familiarity with Python, ML and Snowflake concepts.

Solution Implementation:

Download the data and code from my Github repository.

Initial Setup

Make a new ‘virtual env’ using the provided ‘requirements.txt’ file and use that newly created ‘virtual env’ to execute the below code snippets.

Now, create the ‘.env’ file with below parameters and instead of this ‘xxx-xxx’ give your own credential’s values.

account_name = "xxx-xxx"
user_name = "xxx-xxx"
password = "xxx-xxx"
role = "xxx-xxx"
warehouse = "xxx-xxx"
database = "xxx-xxx"
schema = "xxx-xxx"

Importing packages

import os
import io
import sys
import joblib
import cachetools
import numpy as np
import pandas as pd

import snowflake.snowpark
from snowflake.snowpark import Session
from snowflake.snowpark.functions import udf
from snowflake.snowpark import functions as F
from snowflake.snowpark.functions import sproc


from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import OneHotEncoder
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import train_test_split

Establishing connection to the Snowflake by using Snowpark

def initiateSession(): 

load_dotenv()
connection_parameters = {
"account": os.getenv('account_name'),
"user": os.getenv('user_name'),
"password": os.getenv('password'),
"role": os.getenv('role'),
"warehouse": os.getenv('warehouse'),
"database": os.getenv('database'),
"schema": os.getenv('schema')
}

session = Session.builder.configs(connection_parameters).create()
return session

session = initiateSession()

#----------------------------------------------------------------------#

session.add_packages('snowflake-snowpark-python', 'scikit-learn',
'pandas', 'numpy', 'joblib', 'cachetools')

Ingest data into the Snowflake table.


# Creating the table in which we will upload the CSV file data
query = """
create or replace table HOUSING_DATA (LONGITUDE float,
LATITUDE float,
HOUSING_MEDIAN_AGE float,
TOTAL_ROOMS float,
TOTAL_BEDROOMS float,
POPULATION float,
HOUSEHOLDS float,
MEDIAN_INCOME float,
MEDIAN_HOUSE_VALUE float,
OCEAN_PROXIMITY varchar(255))
"""

session.sql('select * from HOUSING_DATA').show()

#----------------------------------------------------------------------#

# Reading the local CSV file as a pandas dataframe
df = pd.read_csv('housing.csv')

# Converting the df columns to upper case
df.columns = [col.upper() for col in df.columns]

# Now writing the 'df' data into the above created table
session.write_pandas(df, "HOUSING_DATA")

# Querying the table (just to see the data)
session.sql('select * from HOUSING_DATA limit 10').show()

Now, before doing data pre-processing and model building:
We will create some internal stages in Snowflake to hold our UDFs (User difined functions) and Sprocs (Stored Procedures). It is not mandatory to create the stages, its just a good practice to do it and by creating stages it becomes easy to manage the UDFs and Sprocs.

we will going to create three stages:
One stage for the Sprocs, another stage to store the output of our serialized model (.joblib file) and third stage to hold the model serving UDF.

# In this stage we will upload our Spocs
query = """
create or replace stage house_model_training_sproc_stg
directory = (enable = true)
copy_options = (on_error='skip_file')
"""

session.sql(query)

#----------------------------------------------------------------------#

# In this stage we will save our trained model (.joblib file)
query = """
create or replace stage house_model_output_stg
copy_options = (on_error='skip_file')
"""

session.sql(query)

#----------------------------------------------------------------------#

# In this stage we will upload our model Serving UDF
query = """
create or replace stage house_model_serving_udf_stg
directory = (enable = true)
copy_options = (on_error='skip_file')
"""

session.sql(query)

#----------------------------------------------------------------------#

# To see the stages
session.sql("show stages").show()

Defining functions for data pre-processing and model training.

# To save the trained model
def save_file(session, model, path):
input_stream = io.BytesIO()
joblib.dump(model, input_stream)
session._conn._cursor.upload_stream(input_stream, path)
return "successfully created file: " + path

#----------------------------------------------------------------------#

# Model building
def train_model(session: snowflake.snowpark.Session) -> float:

snowdf = session.table("HOUSING_DATA")
snowdf_train, snowdf_test = snowdf.random_split([0.8, 0.2], seed=82)

snowdf_train.write.mode("overwrite").save_as_table("HOUSING_TRAIN")
snowdf_test.write.mode("overwrite").save_as_table("HOUSING_TEST")

housing_train = snowdf_train.drop("MEDIAN_HOUSE_VALUE").to_pandas()
housing_train_labels = snowdf_train.select("MEDIAN_HOUSE_VALUE").to_pandas()
housing_test = snowdf_test.drop("MEDIAN_HOUSE_VALUE").to_pandas()
housing_test_labels = snowdf_test.select("MEDIAN_HOUSE_VALUE").to_pandas()


housing_num = housing_train.drop("OCEAN_PROXIMITY", axis=1)

num_pipeline = Pipeline([
('imputer', SimpleImputer(strategy="median")),
('std_scaler', StandardScaler()),
])

num_attribs = list(housing_num)
cat_attribs = ["OCEAN_PROXIMITY"]

preprocessor = ColumnTransformer([
("num", num_pipeline, num_attribs),
("cat", OneHotEncoder(), cat_attribs)
])

full_pipeline = Pipeline([
('preprocessor', preprocessor),
('model', RandomForestRegressor(n_estimators=100, random_state=42)),
])

full_pipeline.fit(housing_train, housing_train_labels)

save_file(session, full_pipeline, "@house_model_output_stg/housing_price_reg.joblib")

housing_predictions = full_pipeline.predict(housing_test)
lin_mse = mean_squared_error(housing_test_labels, housing_predictions)
lin_rmse = np.sqrt(lin_mse)
return lin_rmse

Creating, deploying and invoking the stored procedure.

train_model_sp = sproc(train_model,
name = 'train_house_model',
stage_location = '@house_model_training_sproc_stg',
is_permanent = True,
replace = True)

#----------------------------------------------------------------------#

# Here will perform all the steps which we have written inside "train_model" func
invoke_result = train_model_sp()
print(invoke_result)

#----------------------------------------------------------------------#

# Just to see the stage content
session.sql("list @house_model_output_stg").show()

# Note: You should see joblib file as a output

Till now, we have trained the model and we have also saved our trained model as a joblib file. So now we will going to use that joblib file to make predictions.

Model Inference or Prediction Serving using UDFs.

# Create UDF for Prediction Serving
session.add_import("@house_model_output_stg/housing_price_reg.joblib")

We will going to use “@cachetools” because:
Lets say we need to call prediction UDF every 10 mins or evry hr, then with the help of “@cachetools” we will avoid repeating loading of the model artifacts (in this case its .joblib file) from our stage into the UDF. Avoid using “@cachetools” only when you have brand new model file (which is new joblib file)

# This func will read the joblib file
@cachetools.cached(cache={})
def read_file(filename):
import_dir = sys._xoptions.get("snowflake_import_directory")
if import_dir:
with open(os.path.join(import_dir, filename), 'rb') as file:
m = joblib.load(file)
return m

#----------------------------------------------------------------------#

features = ['LONGITUDE', 'LATITUDE', 'HOUSING_MEDIAN_AGE', 'TOTAL_ROOMS',
'TOTAL_BEDROOMS', 'POPULATION', 'HOUSEHOLDS', 'MEDIAN_INCOME',
'OCEAN_PROXIMITY']

@udf(name="predict_house_value", is_permanent=True,
stage_location="@house_model_serving_udf_stg", replace=True)
def predict_house_value(LONGITUDE: float,
LATITUDE: float,
HOUSING_MEDIAN_AGE: float,
TOTAL_ROOMS: float,
TOTAL_BEDROOMS: float,
POPULATION: float,
HOUSEHOLDS: float,
MEDIAN_INCOME: float,
OCEAN_PROXIMITY: str) -> float:

m = read_file('housing_price_reg.joblib')
row = pd.DataFrame([locals()], columns=features)
return m.predict(row)[0]

Run Predictions: Way-1

snowdf_test = session.table("HOUSING_TEST")
inputs = snowdf_test.drop("MEDIAN_HOUSE_VALUE")

snowdf_results = snowdf_test.select(predict_house_value(*inputs).alias('predicted_value'),
(F.col('MEDIAN_HOUSE_VALUE')).alias('actual_value')).limit(20)

snowdf_results = snowdf_results.to_pandas()
print(snowdf_results)

Run Predictions: Way-2

# From project's point of view this way is more useful..

query = ''' select predict_house_value(-122.26,37.85,50.0,1120.0,283.0,697.0,264.0,2.125,'NEAR BAY') '''
result = session.sql(query).collect()
print(result)

Okay so at this point, we have completed all the critical work related to the model building and model serving process. Now comes the interesting part, now we will see how to build the Streamlit based data app in Snowflake for our ml model.

Creating a Streamlit app by using Snowsight.

Will going to create this simple prediction serving Streamlit app in Snowflake

Note:

This feature is available to accounts in AWS commercial regions only.

To create a Streamlit app:

  1. Login to your Snowflake account through the web user interface.
  2. After login, from the sidebar (left navigation menu) select ‘Streamlit’.
  3. Now, at the top right corner you should see ‘+ Streamlit App’ button, click on that button and then the app config prompt will open, in that prompt give the same config parameter’s values as you have given in ‘.env’ file and then click on create.
  4. Now, you should see some sample code provided by Snowflake. Replace that sample code in the code editor by this or below code.
'''
Note:
This file's code is meant to run in a Snowsight Editor only.
'''

# Imaport packages
import streamlit as st
from snowflake.snowpark.context import get_active_session

#----------------------------------------------------------------------#

# Get current session
session = get_active_session()

# Set page config
st.set_page_config(layout="wide", initial_sidebar_state='collapsed')

#----------------------------------------------------------------------#

# App layout

col1, col2, col3 = st.columns([3.5,9,0.5])

with col1:
pass

with col2:
st.title('Housing Model Inference App')

with col3:
pass

st.write('---'*30)
st.info('Please fill the below mentioned input parameters inorder to get the prediction.')
st.write('')

col4, col5, col6 = st.columns([1,1,1])


with col4:
longitude_input = st.number_input('Longitude')
rooms_input = st.number_input('No. of Rooms')
households_input = st.number_input('Households')

with col5:
latitude_input = st.number_input('Latitude')
bedrooms_input = st.number_input('No. of Bedrooms')
proximity_list = ['NEAR BAY', '<1H OCEAN', 'INLAND', 'NEAR OCEAN', 'ISLAND']
ocean_proximity_input = st.selectbox('Ocean Proximity', options=proximity_list)

with col6:
house_age_input = st.number_input('House Age')
population_input = st.number_input('Population')
income_input = st.number_input('Income')


query = f'''select predict_house_value({longitude_input}, {latitude_input},
{house_age_input}, {rooms_input},
{bedrooms_input}, {population_input},
{households_input}, {income_input},
'{ocean_proximity_input}')'''

#----------------------------------------------------------------------#

# Model Inference

st.write('')
col7, col8, col9 = st.columns([2.5,2,1])


with col8:
predict_flag = False
if st.button('Predict', key='submit', type='primary'):
result = session.sql(query).collect()
price_value = result[0][0]
predict_flag = True

if predict_flag == True:
st.write('---'*30)
st.success(f'Based on the input parameters, the house price is = ${price_value}')

Thats all, we have built and deployed the ML Model in Snowflake. 🙂

❄️ Conclusion

So in this article we explored how to build and deploy ML Models in Snowflake. We saw how to create stages, how to work with UDFs, etc. But most importantly how to build streamlit app using Snowsight. Now, why is this important because by using Snowsight and Snowpark we just eliminate the need to move data outside of our snowflake servers. It’s like a complete package, where we just have to ingest the data in the Snowflake and then we can perform analytical operations, build ml models and create data apps, basically everything at one place and therefore its a great way to build ml models as its secure, easy to maintain and share.

Thanks for reading!!

--

--