Snowflake ML — An Example Project

A brief walkthrough of a template project for Snowflake ML.

DALL·E 3: A colorful, fictional world of the 1800s, featuring a polar bear and two human data scientists.

Background

The Snowflake ML Modeling API has been generally available since early December 2023, and the Snowflake Model Registry has been in public preview since January 2024.

There are a few examples of working with these packages out there that we frequently visit for inspiration and reference. Chase Romano and Sikha Das have shared some great material on this subject.

These examples use notebooks, which are great but can often be challenging to maintain in repositories. We frequently encounter scenarios where users want to keep their code in Python scripts and operationalize it via traditional orchestration tools and/or Snowflake tasks.

Kirk Mason and I opted to implement our project as a Python module. This helps avoid needing to append a relative path to the system path to specify imports, which we often see in notebooks. The scripts we will walk through can be executed directly in Python or registered via Snowpark as Stored Procedures, later executed as Snowflake tasks in a DAG.

We think having a well-organized project structure is essential, so that’s what we’ll outline here.

The Project Structure

Having a template or framework is essential to start any project. This helps with onboarding new team members as well. While there may be variations of this to fit specific needs, here’s an outline of what we find to be a good starting point.

Example Project Structure

Let’s break down these sections and the various files at the top level.

docs

The docs folder contains any project-specific documentation needed to encompass requirements and help enable future developers and stakeholders.

scratch

The scratch folder contains any notebook code, experimentation, or exploration that should not be executed automatically.

my_project (or src)

This folder serves as the “heart” of our project. Treating code as a Python package has several advantages. It enhances modularity, organization, and readability by separating related functionalities into distinct modules. It also provides namespace isolation to avoid naming clashes and promotes reusability, minimizing code duplication.

I mentioned in the earlier section that providing options to run these Python scripts directly or by registering the functions was important. Here’s how we did that. The code is shortened for brevity, as the model training code isn’t the main focus here.

from snowflake.ml.modeling.impute import SimpleImputer
from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.ml.modeling.xgboost import XGBRegressor
from snowflake.ml.registry import Registry
from snowflake.snowpark import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark import types as T
from my_project.common import get_next_version
from my_project.common import get_performance_metrics

import logging


def train(session: Session) -> str:
logger = logging.getLogger(__name__)
logger.info(
"{'message':'Begin model training procedure', 'unit':'analytics'}"
)
...

pipeline = Pipeline(
[
...
]
)
pipeline.fit(train_df)

logger.info(
"{'message':'Obtain metrics', 'unit':'analytics'}"
)

train_result_df = pipeline.predict(train_df)
test_result_df = pipeline.predict(test_df)

combined_metrics = dict(
train_metrics=get_performance_metrics(
"REGRESSION", train_result_df, "PRICE", "OUTPUT_PRICE"
),
)

reg = Registry(session=session, schema_name="MODELS")

model_name = "MY_MODEL"
model_version = get_next_version(reg, model_name)

reg.log_model(
model_name=model_name,
version_name=model_version,
model=pipeline,
metrics=combined_metrics,
)

logger.info(
"{'message':'Finished training and registering', 'unit':'analytics'}"
)

return f"Model {model_name}.{model_version} is trained and deployed."


if __name__ == "__main__":
session = Session.builder.getOrCreate()
session.use_warehouse("ML_BIG")
session.use_database("ML_EXAMPLES")
session.use_schema("DIAMONDS")
raise SystemExit(train(session))

We define our function train, which can be registered directly as a stored procedure at a later step (register_deploy_dags).

As an alternative to registering as a stored procedure, invoking this script directly will hit the __main__ condition, establish a connection to Snowflake, and execute the train function. This allows using this code with an orchestration tool if you do not intend to use Snowflake tasks.

pyproject.toml

The pyproject.toml file contains the information needed to set up the project and the Python environment. Here’s what ours looks like.

[build-system]
requires = ["setuptools", "wheel"]
build-backend = "setuptools.build_meta"

[project]
name = "my_project"
description = "A Snowpark ML project."
version = "0.1.0"
readme = "README.md"
dependencies = [
"snowflake-snowpark-python==1.14.0",
"numpy==1.26.3",
"scikit-learn==1.3.0",
"snowflake[ml]",
"xgboost==1.7.3"
]

[tool.setuptools.packages.find]
include = ["my_project"]

[project.optional-dependencies]
dev = ["nbqa[toolchain]", "jupyter"]

Having this file in the root of our repo will allow us to easily install our project with pip by executing the following command:

pip install .

Or, if you wish to perform an editable install, you can also do the following:

pip install -e .

Since we have specified a “dev” extra in our configuration file, we can also install dev dependencies like this:

pip install -e ".[dev]"

register_deploy_dags.py

In this code, we’re registering stored procedures from our Python module and setting them up to create a DAG in Snowflake.

session.sproc.register_from_file(
file_path="my_project/train.py",
func_name="train_model",
name="TRAIN_MODEL",
is_permanent=True,
packages=["snowflake-snowpark-python", "snowflake-ml-python"],
imports=["my_project"],
stage_location="@PYTHON_CODE",
replace=True,
execute_as='caller'
)

Later, we can deploy our DAG in the same script.

with DAG(
"EXAMPLE_DAG",
schedule=Cron("0 8 * * 1", "America/New_York"),
stage_location="@PYTHON_CODE",
use_func_return_value=True,
) as dag:
train_task = DAGTask(
name="TRAIN_MODEL_TASK",
definition="CALL TRAIN_MODEL();",
warehouse="COMPUTE_WH",
)
set_default_task = DAGTask(
name="SET_DEFAULT_VERSION",
definition="CALL SET_DEFAULT_VERSION('DIAMONDS', 'rmse', True);",
warehouse="COMPUTE_WH",
)
train_task >> set_default_task

If you want to learn more about how this works, please check out the docs.

Conclusion

We hope this gives you a foundation for getting started and is flexible enough to meet your needs.

We encourage you to try this, make any necessary changes, and let us know how it works. Please don't hesitate to share your experiences or challenges, as they might help others.

A working demonstration of this can be found at the following repo:

--

--

Tyler White
Snowflake Builders Blog: Data Engineers, App Developers, AI/ML, & Data Science

I constantly seek new ways to learn, improve, and share my knowledge and experiences. Solutions Architect @ Snowflake.