Getting Started with Snowpark Model Registry

The fundamentals and beyond of Snowpark MLOps with a Pipeline being deployed to the Model Registry and invoked as a DAG. Using Snowpark, pipelines can be reused without importing joblib, and the process is simplified.

Model Training & Deployment

Background

Not long ago, Chase Romano and I had the opportunity to demonstrate the capabilities of the Snowpark Model Registry hosted by Felipe Hoffa.

Not long before the presentation, we did a fair amount of refactoring from the preview version of our code to ensure it aligned with the new public API that saw some beneficial updates. I like what we have now. I want to summarize a few of these changes before discussing other ways to improve our existing solution.

What Changed?

  1. Transition to Native Snowflake Objects: Previously, metadata was stored in tables, and models were stored in stages. However, now models are treated as native schema-level objects within Snowflake. This change simplifies the storage and management of models, allowing users to interact with them directly using SQL or Python, without requiring any specific schema preparation.
  2. Enhanced Model Accessibility and Deployment: Before, deploying models and controlling access to them was manual and limited. Now, models have built-in methods that can be accessed directly from SQL or Python, eliminating manual deployment. Plus, access to specific models can be granted to users based on their roles, providing greater control over who can use them.
  3. Simplified API Interaction: It is easier to set and update tags and metrics, log models, and update metadata or versioning.

These changes to the Snowpark Model Registry improve the user experience and functionality, making it more integrated, flexible, and easier to use within the Snowflake ecosystem. If you want to go deeper with the Model Registry, please read this blog.

So… Could we make this even easier?

In the video, we manually use several Snowpark ML preprocessing steps and set up a Snowpark ML GridSearchCV to find the classifier and hyperparameters with the best accuracy. Much of this is to demonstrate how the steps operate and run in Snowflake. But what if we could do these steps sequentially and automatically?

We can. We can use a pipeline.

What’s a pipeline?

Pipelines apply a list of transformations and a final estimator sequentially. This encapsulates preprocessing steps and model training into a single object, which can be used like any other estimator.

  1. Simplicity and Clarity: By bundling the preprocessing steps and model training into one, pipelines make the code more readable and concise. Instead of having multiple transformations and data manipulations scattered throughout your code, a pipeline consolidates these operations into a clear, linear sequence.
  2. Ease of Experimentation: Pipelines simplify changing models and preprocessing steps. Encapsulated in one object, you can swap models or add/remove preprocessing without extensive code rewrite. This eases rapid experimentation and testing of different model configurations.
  3. Facilitating Cross-Validation and Grid Searches: You can do hyperparameter tuning and cross-validation on the whole pipeline, including preprocessing and model hyperparameters. This results in a more comprehensive optimization process.

We can define a pipeline for imputing, encoding, performing grid searches, and cross-validation.

Pipeline Code

How do we use it?

It’s just as simple to use the pipeline as it would be to use any other preprocessing transformation or classifier training.

pipeline.fit(train_df)

Since the GridSearchCV is the final step, we can use the following code to get and preview the results if we want to make predictions on our training set.

pipeline.predict(train_df).show()

What’s great about this is that we can change the size of our warehouse before and after the fitting to ensure we are making the most of our computing capabilities.

We can explore the pipeline as a native scikit-learn object as well.

pipeline.to_sklearn()
scikit-learn Pipeline

How can we evaluate it?

Snowflake makes these functions available and operates similarly to the native scikit-learn equivalents, but they require named arguments. Here are a few metrics we can explore and later log to the registry.

metrics = {
"Accuracy": accuracy_score(
df=result_df, y_true_col_names="SURVIVED", y_pred_col_names="OUTPUT_SURVIVED"
),
"Precision": precision_score(
df=result_df, y_true_col_names="SURVIVED", y_pred_col_names="OUTPUT_SURVIVED"
),
"Recall": recall_score(
df=result_df, y_true_col_names="SURVIVED", y_pred_col_names="OUTPUT_SURVIVED"
),
"F1 Score": f1_score(
df=result_df, y_true_col_names="SURVIVED", y_pred_col_names="OUTPUT_SURVIVED"
),
"Confusion Matrix": confusion_matrix(
df=result_df, y_true_col_name="SURVIVED", y_pred_col_name="OUTPUT_SURVIVED"
).tolist(),
}

Model Registry

Now, we’re ready to deploy this pipeline.

reg = Registry(session=session)

reg.log_model(
model_name="TITANIC",
version_name=get_next_version(reg, "TITANIC"),
model=pipeline,
metrics=metrics,
)

We can see it by running show_models which returns a Pandas DataFrame with your model names and their versions. In the previous version of the Model Registry, navigating the results was challenging. We can quickly get the model and even see the versions available with their metrics by running the following code.

reg.get_model(model_name).show_versions()

How do we use it?

In just a few lines of code, we can do inference. Our pipeline does all of the preprocessing steps for us, so we won’t need to reference any temporary or permanent objects that persist due to the transformations.

m = reg.get_model("TITANIC")
mv = m.default
mv.run(test_df, function_name="predict").show()

This could be used in a new notebook, SQL Session, etc. The possibilities are endless. Speaking of a SQL Session, I know not everybody likes Python; here’s how to use that pipeline with SQL, using the newer “EXCLUDE” keyword.

SELECT *, 
TITANIC!PREDICT(*):OUTPUT_SURVIVED::INT AS OUTPUT_SURVIVED
FROM
(
SELECT * EXCLUDE SURVIVED
FROM DATA
);

This is all occurring on Snowflake compute, meaning any machine could use this pipeline for inference. The data doesn’t leave unless you trigger an evaluation operation, like a collect, show, or to_pandas.

Task Setup

The Snowflake Python API can be used to set up a Task DAG to chain this process together. That’s another topic, but I wanted to provide a sample of what this would look like.

Assuming we convert the train and infer processes into stored procedures, they can be called with tasks.

The code to do this would require the snowflake package to be installed.

from snowflake.core import Root
from snowflake.core._common import CreateMode
from snowflake.core.task.dagv1 import DAG, DAGOperation, DAGTask
from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
from snowflake.snowpark import Session

session = Session.builder.configs(SnowflakeLoginOptions()).getOrCreate()

with DAG(
"TITANIC_DAG",
schedule=Cron("0 8 * * 1", "America/Los_Angeles"),
stage_location="@PYTHON_CODE",
packages=["snowflake-snowpark-python", "snowflake-ml-python"],
use_func_return_value=True,
) as dag:
train_task = DAGTask(
name="train_model",
definition="CALL TRAIN();",
warehouse="COMPUTE_WH",
)
batch_infer_task = DAGTask(
name="batch_infer",
definition="CALL BATCH_INFER();",
warehouse="COMPUTE_WH",
)
train_task >> batch_infer_task
schema = root.databases[session.connection.database].schemas[session.connection.schema]
op = DAGOperation(schema)
op.deploy(dag, mode=CreateMode.or_replace)

Check out this article to learn more!

Conclusion

Features continue to be added to the Model Registry; while the Pipeline capabilities are comparable to scikit-learn, not everything is available. I’m looking forward to the additions and improvements and am excited to start building more using the Model Registry.

This isn’t intended to be a copy-paste demo; while aspects could certainly be used, it’s more intended to get a little deeper with the Snowpark Model Registry’s role in MLOps.

Extras

We wrote a few cool utilities to make this easier. They might not be perfect, but they made testing simpler.

Here are a couple!

def get_version_with_highest_accuracy(reg, model_name: str):
model_versions = reg.get_model(model_name).show_versions()
model_versions["accuracy"] = model_versions["metadata"].apply(
lambda x: json.loads(x).get("metrics", {}).get("Accuracy", None)
)
return (
model_versions.sort_values(by="accuracy", ascending=False)
.head(1)["name"]
.values[0]
)


def get_next_version(reg, model_name) -> str:
models = reg.show_models()
if models.empty:
return "V_1"
elif model_name not in models["name"].to_list():
return "V_1"
max_version = max(
ast.literal_eval(models.loc[models["name"] == model_name, "versions"].values[0])
)
return f"V_{int(max_version.split('_')[-1]) + 1}"

--

--

Tyler White
Snowflake Builders Blog: Data Engineers, App Developers, AI/ML, & Data Science

I constantly seek new ways to learn, improve, and share my knowledge and experiences. Solutions Architect @ Snowflake.