Snowpark ML, A Machine Learning Toolkit for Snowflake

Update 2024–01–25: Snowpark ML modeling API is now in GA and the Model Registry is in Public Preview, so I have updated the post to reflect the latest changes.

During its annual Summit 2023 user conference in the last week of June, Snowflake announced the availability of Snowpark ML, which is a set of tools, including SDKs and underlying infrastructure, for building and deploying machine learning models.

Snowpark ML provides APIs to support each stage of an end-to-end Machine Learning development and deployment process, and includes two key components: Snowpark ML Modeling and Snowpark ML Ops (Public Preview).

With Snowpark ML, you can pre-process data and train, manage, and deploy ML models all within Snowflake. At every stage of the machine learning workflow, you benefit from Snowflake’s proven performance, scalability, stability and governance.

The Snowpark ML APIs are provided as a Python library, snowflake-ml-python, that is installed using pip or conda, and is built on top of the Snowpark DataFrame API.

The focus for this post will be the Snowpark ML Development component.

Snowpark ML Development

Snowpark ML includes a modeling package (snowflake.ml.modeling) and an set of framework connectors that provide optimized, secure and performant data delivery to the Pytorch and Tensorflow frameworks in their native data loader format. This section will focus on the modeling package, if interested in the Framework Connectors you can get more information here.

The snowflake.ml.modeling module provides APIs for data pre-processing, feature engineering and model training (including distributed hyperparameter tuning). All of these tasks are performed within Snowflake and the APIs are based on familiar libraries such as scikit-learn, xgboost and lightgbm.

Why is there a need for Snowpark ML?

A common pattern today to do feature engineering and model training when the data is in Snowflake is to pull the data out, for example using the to_pandas() method on a Snowpark DataFrame or unloading the data into a file on an S3 bucket, and then run the pipeline on external compute. This not only adds cost and complexity to the architecture, but also presents security and governance challenges.

A high-level example of such a workflow is shown below.

Prior to Snowpark ML, the way to handle this is to use the Snowpark DataFrame API for feature engineering, a Python Stored Procedure for preprocessing using e.g. Scikit-Learn OrdinalEncoder and training, and then a Python UDF to run the model inference in Snowflake. The approach is illustrated in the diagram below

The advantage of this is that everything is done within Snowflake, there is no data movement and the built-in security and governance can be used.

The code I need to write is the same as I would write if I were doing local training, the only difference is that I need to have the code in a Python function that I then deploy in Snowflake. To do the inference in Snowflake, I need to create a scoring function and then deploy it to Snowflake as a Python UDF, if I want to be able to use distributed execution in Snowflake.

Most Data Scientists want to use familiar APIs and frameworks and have them just work in Snowflake, without having to think about how the backend compute framework handles the compute.

By using the snowflake-ml library, they can now use the same type of coding as they would when using, for example, Scikit-Learn or XGBoost with data in a Pandas DataFrame. The difference is that the processing is done on the Snowflake side and can be used directly on Snowpark DataFrames (as well as Pandas DataFrames).

This may not change the workflow on a higher level, as shown below, but it does simplify the way you write the code.

Instead of writing a stored procedure for transform and training, I can now use the snowflake.ml.modeling.preprocessing and snowflake.ml.modeling. By using the fit() method of a transformer, such as OneHotEncoder, or an algorithm, such as XGBRegressor, the processing is pushed down to Snowflake.

As shown below, the push down is done in two different ways, depending on whether I use a pre-processing function or an algorithm.

The pre-processing functions use the SQL engine in Snowflake, which means they run in distributed multi-node execution. The advantage is that I can now do this on large amounts of data without the risk of running into memory problems or high computational costs due to long running jobs.

The algorithms available in snowflake.ml.modeling are the same as those in the open source libraries Scikit-Learn, XGBoost and LightGBM, and what the library does is push the training of these into Snowflake and run it in the secure Python sandbox. To train a model in Snowflake using snowflake-ml, all I have to do is define the model object and use its fit method.

To do preprocessing and training using snowflake.ml I can write the following code, where all the processing is done in Snowflake.

from snowflake.ml.modeling.pipeline import pipeline
from snowflake.ml.modeling.preprocessing import MinMaxScaler, OrdinalEncoder
from snowflake.ml.modeling.metrics import mean_squared_error, mean_absolute_error, r2_score
from snowflake.ml.modeling.xgboost import XGBRegressor

# Random split
df_train, df_test = session.table("diamonds").drop('ROW').random_split(weights=[0.9, 0.1], seed=0)

cat_cols = ["CUT", "COLOR", "CLARITY"]
cat_cols_oe = ["CUT_OE", "COLOR_OE", "CLARITY_OE"]
num_cols = ["CARAT", "DEPTH", "TABLE_PCT", "X", "Y", "Z"]

# Define a pipeline that does the preprocessing and training of
# a XGBRegressor model
pipe = Pipeline(steps=[
("ord", OrdinalEncoder(input_cols=cat_cols, output_cols=cat_cols_oe)),
("scaler", MinMaxScaler(input_cols=num_cols, output_cols=num_cols)),
("regressor", XGBRegressor(input_cols=cat_cols_oe+num_cols
, label_cols=["PRICE"]
, output_cols=['PREDICTION'], n_jobs=-1))
]
)

# Fit the pipeline
xgb_model = pipe.fit(df_train)

# Test the model
df_test_pred = xgb_model.predict(df_test)
print(f'MSE: {mean_squared_error(df=df_test_pred, y_true_col_names="PRICE"
, y_pred_col_names="PREDICTION")}')
print(f'MAE: {mean_absolute_error(df=df_test_pred, y_true_col_names="PRICE"
, y_pred_col_names="PREDICTION")}')
print(f'R2: {r2_score(df=df_test_pred, y_true_col_name="PRICE"
, y_pred_col_name="PREDICTION")}')

As shown above the way to code is very similar to if I had used sklearn and xgboost, I basically only need to change the imports so I am using snowflake.ml.modeling instead and use some additional parameters. I can then call the relevant fit/predict methods directly on a Snowpark DataFrame.

I can also do distributed hyperparameter optimization, that will allow me to train multiple models in parallel.

from snowflake.ml.modeling.model_selection import GridSearchCV

grid_search = GridSearchCV(
estimator=XGBRegressor(),
param_grid={
"n_estimators":[100, 200, 300, 400, 500],
"learning_rate":[0.1, 0.2, 0.3, 0.4, 0.5],
},
n_jobs = -1,
scoring="neg_mean_absolute_percentage_error",
input_cols=cat_cols+num_cols,
label_cols="PRICE",
output_cols="PREDICTION"
)

# Train
grid_search.fit(df_train)

To use my fitted pipeline (ML XGBoost model) on a Snowpark DataFrame, I can use the predict method, the library pushes this down to run in Snowflake as well.

df_pred = xgb_model.predict(df_new_data)

If I would need to use the Snowpark ML object outside Snowflake I can get a fitted sklearn object by using the to_sklearn method. I can then use it as any other sklearn object.

sklearn_pipe = xgb_model.to_sklearn()

By using Snowpark ML Ops (Public Preview) I can save the Snowpark ML object into the Snowpark Model Registry:

from snowflake.ml.registry.registry import Registry

# Use the session default database and schema for model registry
reg = Registry(session=session)

# Log the model
mv = reg.log_model(model=xgb_model
, model_name="diamonds_model"
, version_name="v1"
, metrics={"mean_squared_error":mse
, "mean_absolute_error":mae
, "r2_score":r2})

When a model is logged a Model object are created in Snowflake and I can call any of the available methods for the model (which is depended on what algorithm I have used) directly without having to take any additional steps.

remote_prediction = mv.run(df_test, function_name="predict")

To see what methods are available I can use the show_functions method.

mv.show_functions()

If I want to call the predict method using SQL I can use the following code, which will use the default version of the model. The documentation provieds more information on how to work with the model objects using SQL.

SELECT diamonds_model!predict("CUT", "COLOR", "CLARITY", "CARAT"
, "DEPTH", "TABLE_PCT", "X", "Y", "Z")
FROM new_data_table;

If I do not want to use the Snowpark Model Registry I can use xgb_model.predict as part of a Python script, a Python Stored Procedure or create a Python UDF using the underlying Scikit Learn/XGBoost/LigthGBM object.

# Get the Scikit-Learn object and save it as a file
sklearn_pipe = xgb_model.to_sklearn()
joblib.dump(sklearn_pipe, 'sklearn_pipe.joblib')

# Upload the file to a internal SNowflake stage

session.file.put('sklearn_pipe.joblib'
, "@SNOWML_DEMO/DIAMONDS_ASSETS", overwrite=True
, auto_compress=False)

# Use a vectorized udf
@F.udf(name = "predict_diamond_price_v2", is_permanent = False
, imports = ['@SNOWML_DEMO/DIAMONDS_ASSETS/sklearn_pipe.joblib']
, packages = ['pandas', 'scikit-learn==1.2.2', 'xgboost==1.7.3'
, 'joblib', 'cachetools']
, replace = True, session = session)
def predict_diamond_price(pd_input: T.PandasDataFrame[str, str, str, float
, float, float, float
, float, float]
) -> T.PandasSeries[float]:
# Make sure we have the columns in the expected order in the Pandas Dataframe
features = ["CUT", "COLOR", "CLARITY", "CARAT", "DEPTH", "TABLE_PCT"
, "X", "Y", "Z"]
pd_input.columns = features
model = read_file('sklearn_pipe.joblib') # Same function as earlier
prediction = model.predict(pd_input)
return prediction

Conclusion

As you can see, Snowpark ML Modeling will allow you to write your Data Science pipelines in the same way as you do today when using for example Pandas DataFrame and Scikit-Learn, but with the computation pushed down to Snowflake instead.

With the Snowpark Model Registry (currently in Public Preview) the process will be even simpler, where you will be able to also store the models in Snowflake and deploy them using a Python API without having to know how to create a Python UDF for example.

More information can be found in the documentation and if you want to test it yourself, then the Snowpark ML Quickstart or the HEX demo What’s New in Snowpark Machine Learning is good places to start your journey with Snowpark ML.

--

--