Building a Second Generation Machine Learning Platform Using Snowpark

Endeavor: Data Blog
10 min readJul 24, 2023

By Ilya Galperin

Machine Learning at Endeavor

Behind the best events, you’ll often find a group of folks laser-focused on making a remarkable, lasting impression on attendees. When a fan streams a UFC card, catches the world’s top bull riders at a Professional Bull Riders tour, or bids on a classic car at Barrett-Jackson, their experience is powered by a likewise passionate team at Endeavor.

In the data science and engineering group, we champion this ethos by helping build highly personalized experiences and applications for fans across a wide range of Endeavor businesses through the power of data. Machine learning (ML) models play a critical role in our approach. They allow us to make predictions based on patterns within datasets, leading to valuable insights about our customers and highly informed decisions on the best ways to serve them.

Data scientists at Endeavor frequently build new ML models and tune existing ones in service of this goal. Running these models requires significant computational power and technical expertise. To meet these challenges and power our machine learning platform, we have adopted Snowpark, the set of libraries and runtimes that enable users to deploy models directly in Snowflake with ease and highly scalable processing capacity.

Guiding Principles

Before getting into the specifics of Snowpark, it’s helpful to take a look at the homegrown ML platform we have been using thus far, including the product objectives it aimed to fulfill, and any gaps we might want to address.

Prior to adopting Snowpark, machine learning models were deployed as purpose-built containers orchestrated with Kubernetes. Many models had their own sets of dependencies and a good amount of custom code for loading, handling, and eventually saving data back into Snowflake. Getting a model running on our infrastructure required continuous collaboration between a data engineer and data scientist, with the former responsible for deployment and the latter for maintaining core model logic.

While the flexibility inherent to this architecture was a big plus, we found that taking a model to production or iterating on an existing model took days, and sometimes weeks, longer than we would have liked.

Our business objectives demand that we work quickly and on a relatively short turn-around time to deploy models to applications. Therefore, we focused on the following guiding principles when designing our second-generation machine-learning platform:

  1. Self-Serve: The design must enable a data scientist to move a machine learning model from inception all the way through deployment entirely by themselves. A successful end state here would remove the need for multiple engineers to be involved in the ML lifecycle. There should be zero dependency on a machine learning or DevOps engineer to get a model into production.
  2. Ease of Use: Given that data scientists would be responsible for model deployment, ease of use is critical. However, ease of use shouldn’t come at the expense of flexibility; we must still be able to run a large variety of models using a broad range of ML libraries and write custom Python code where needed.
  3. Repeatability: There must be a way to periodically retrain models in a simple manner. The lineage of models must be well understood and recreating an older version of a model possible at all times. Likewise, model training methods and routines should be stored in a git repo and be reviewable by others.
  4. Computationally Scalable and Cost Effective: As different models will have varying compute requirements, our host infrastructure must be capable of scaling up and down to accommodate a range of workloads.

Snowpark: What and Why?

Snowpark is the set of libraries and runtimes in Snowflake that securely deploy and process non-SQL code, including Python, Java, and Scala. This code can be run server-side inside the Snowflake infrastructure using a virtual warehouse. Snowpark also offers a set of client-side libraries including the Snowpark DataFrame API, which is useful for querying and manipulating data directly in Snowflake using familiar DataFrame-style programming.

With this framework, Snowflake enables users to work with relatively large datasets without needing to worry too much about the underlying implementation for data distribution or parallel processing. The benefit of this approach becomes readily apparent when working with the sizeable datasets that are used to train ML models. Since Snowflake operates the infrastructure for us, much of the complexity in maintaining and scaling a machine-learning platform is abstracted away from our data scientists.

To scale compute, for example, a user can toggle between different virtual warehouse sizes. And because the compute resource and data live in the same infrastructure, the latency involved in moving data is decreased significantly compared to having to fetch it from Snowflake and process it in an external cluster. At the sizes of data necessary for machine learning, this ends up saving substantial runtime.

A less tangible but significant advantage of Snowpark from our perspective was the existing familiarity of our users with the ecosystem. Our data scientists are well-versed in Snowflake and its abstractions (i.e. stored procedures, user-defined functions), making this framework close to off-the-shelf ready. Data scientists are able to train and deploy models directly within Snowpark, without having to switch between different tools or programming languages.

It is important to note that Snowpark currently supports only certain packages available in the Snowpark conda channel (you can find supported back-versions via select * from information_schema.packages where package_name = 'name'). Thankfully, the list of packages is extensive and includes a wide range of machine-learning libraries, with new ones being made available over time. Along with the recent announcement of Snowpark Container Services, this will no longer be a hard limit.

General Architecture

The diagram above shows the different stages of our machine learning model deployment lifecycle.

The first step is exploratory data analysis (EDA), feature engineering, and initial model development. Here, the Snowpark API allows our data scientists to easily interact with data directly in Snowflake, using virtual warehouses for data transformation and feature engineering (see related post about our feature store here).

The output of this step is a Python-stored procedure capable of preparing data, training, and re-training a machine learning model on demand. At the same time, we create a feature catalog. This artifact is useful to tell us what input data types our model expects and as a reference for when we are ready to generate a SQL query that uses our model for inference.

Finally, we register a user-defined function (UDF) representing the model. This UDF loads our model from an internal Snowflake stage and allows it to be run against datasets living in our warehouse.

With self-service being the main goal in our design, it was crucial to ensure that each step is easy to perform both locally and via an automated deployment process. We accomplished this by creating a script around each discrete step and orchestrating them in a CI/CD pipeline operating in production and pre-production environments, i.e.:

register_training_sproc.py -> create_feature_catalog.py -> register_inference_udf.py

Now, a data scientist can deploy their model and navigate it through the above lifecycle by making commits to a code repository.

Running the training stored procedure and inference UDF is as simple as executing a SQL query. Models can be trained, tested, and used for inference directly in the Snowflake console for rapid iteration, or orchestrated by an external tool for periodic re-training and batch inference.

At each phase in the pipeline, we log metadata about artifacts such as model versions and other information about the training dataset. This makes the process repeatable and model lineage well documented and readily available.

In the following sections, we’ll take a deeper look into some of these components to help illustrate how they fit into our overall design.

Specifics: Model Training

First, we create a script that is registered as a stored procedure that can later be called to train our machine-learning model. This stored procedure:
a) Handles and prepares the training dataset.
b) Fits the model based on the transformed training data.
c) Serializes and saves a trained model into an internal Snowflake stage.

from joblib import dump
from snowflake.snowpark.session import Session
from sklearn.tree import DecisionTreeClassifier

def save_model(session: object, model: object, model_name: str) -> None:
"""
Saves a serialized ML model to an internal Snowflake stage.

Args:
- session: snowpark session object
- model: fitted machine learning model
- model_name: name of the model
"""
file_path = "/tmp/" + model_name + ".pkl"
dump(model, file_path)
session.file.put(file_path, "@MLP/models", auto_compress=False, overwrite=True)

def main(session: Session, training_table: str,) -> list:
"""
The main model training procedure.

Args:
- session: snowflake session object
- training_table: snowflake table to be used for the training task

Returns:
- list: here you can return feature importance or some other result of
this training procedure to the caller.
"""
training_data = session.table(training_table).to_pandas()

# create_transformer is a custom function used to prepare the training data
transformer = create_transformer(training_data)

X = transformer.transform(training_data)
y = training_data["target"]

model = DecisionTreeClassifier()
model.fit(X, y)

result = save_model(session, model, "MY_MODEL")

return [result, MODEL_NAME]

After the training script is prepared, we register it as a stored procedure using the Snowpark API.

# create_session() is a custom function used to connect to Snowpark
session = create_session(**snowflake_credentials)

# model_requirements.txt contains all third party packages used by the training stored procedure
session.add_requirements("model_requirements.txt")
session.add_packages("snowflake-snowpark-python")

session.sproc.register_from_file(
file_path="training_sproc.py",
func_name="main",
name="SPROC_TRAIN_MY_MODEL",
is_permanent=True,
replace=True,
stage_location="@MLP",
)

We can now easily call our procedure on demand or on a schedule to train and re-train our model using a SQL query like CALL SPROC_TRAIN_MY_MODEL(TRAINING_TABLE);.

Specifics: Feature Catalog

Our feature catalog serves as a reference point for the model’s features, their Snowpark data types and ordinal position.

Here, we pass in the model’s name, the name of the table used to train the data, and any columns we need to exclude from the training set such as the label.

def create_feature_catalog(
session: object, model_name: str, table_name: object, exclude_columns: list = None
) -> None:
"""
Creates the {model}_feature_catalog table which will be used downstream by the
model's scoring UDF.

The result is a table containing a list of feature names, their Snowpark data types,
and their ordinal position used when training the model.

Args:
- session: snowpark session object
- model_name: name of the model
- table_name: Snowflake table containing a training dataset
- exclude_columns: list of columns to exclude from the feature catalog, useful
for y-axis, user identifiers, etc.

"""
df = session.table(table_name).drop(exclude_columns)

data = []
for e, row in enumerate(df.schema.fields):
data.append((row.name, str(row.datatype), e))

df_output = session.create_dataframe(data, schema=["NAME", "DATA_TYPE", "POSITION"])

df_output.write.mode("overwrite").save_as_table("MY_MODEL_FEATURE_CATALOG")

The result is a feature catalog table used in the next steps.

Specifics: Inference User-defined Function

To make use of our trained model, we write a UDF that loads our model from an internal stage, reads input as a Pandas dataframe, makes an inference and returns the result.

def inference_udf(df):
import sys
from joblib import load

import_dir = sys._xoptions["snowflake_import_directory"]
model_file = "MY_MODEL.pkl"
model = load(import_dir + model_file)

df.columns = feature_names

return model.predict(df)

Note the variable feature_names. This variable is used to assign column names to the input dataframe. Since the UDF object only processes raw data and does not assign column labels, this step will be important if your model expects named features.

Referencing the feature catalog in the previous step, feature_names can be set like so:

catalog_names = session.sql("SELECT NAME FROM MY_MODEL_FEATURE_CATALOG ORDER BY POSITION")
feature_names = [row["NAME"] for row in catalog_names.collect()]

We can now register our user-defined function like so:

from snowflake.snowpark.types import (  
# import any Snowpark data types you will use here
BooleanType,
FloatType,
PandasDataFrameType,
PandasSeriesType,
...
)

session.add_requirements("model_requirements.txt")
session.udf.register(
func=udf_main,
return_type=PandasSeriesType(return_type),
input_types=[PandasDataFrameType(input_types)],
name=f"UDF_MY_MODEL",
stage_location="@MLP",
max_batch_size=MAX_BATCH_SIZE,
replace=True,
is_permanent=True,
imports=["@MLP/models/MY_MODEL.pkl"],
session=session,
)

There are two important variables here to flag in the registration script: input_types and return_type.

return_type should be set to the Snowpark data type that the model is expected to return. For example, if we expect a boolean result, we would set return_type to BooleanType. If we expect a numeric value, we could set this to FloatType.

Our input_types variable will be constructed based on the data types stored in the feature catalog and tells the UDF what structure to expect from the input dataframe.

def get_input_types(feature_catalog: object) -> list:
data_types = [row["DATA_TYPE"] for row in feature_catalog.collect()]
input_types = []
for data_type in data_types:
type_object = type_converter(data_type)
input_types.append(type_object)
return input_types

def type_converter(data_type: str) -> object:
if data_type == "BooleanType()":
return ArrayType()
elif data_type == "FloatType()":
return FloatType()

After registration, the UDF is ready for use and can be executed via a SQL statement:

SELECT
key,
UDF_MY_MODEL(
feature1,
feature2,
...
) as inference
FROM my_inference_table;

You can easily generate this select statement by referencing the feature catalog:

catalog_names = session.sql("SELECT NAME FROM MY_MODEL_FEATURE_CATALOG ORDER BY POSITION")
feature_names = ", ".join([row["NAME"] for row in catalog_names.collect()])
query = "SELECT UDF_MY_MODEL({columns}) AS RESULT FROM <TOSCORE_TABLE>;"

Conclusion

In its current state, our Snowpark-powered machine learning platform has enabled data scientists to deploy ML models in a streamlined and automated fashion. The cost and time-saving benefits here are twofold: models can go to production days and weeks faster than before, and engineers are freed up from having to deploy models into our infrastructure and troubleshoot code that they did not write.

The time to run batch inference jobs has also been sped up significantly due to the fact that you can bring modeling and inference code to the data. Most importantly, data scientists have full ownership over the model deployment process and work with tooling and abstractions native to Snowflake that they are well accustomed to working with.

Recently, Snowflake announced expanded ML offerings including a dedicated Machine Learning API, a containerization service, and dev-ops enhancements including the ability to test locally and native git integration. As these features enter public preview and release, we feel confident that continuing investment into our Snowpark ML platform will allow our team to continue providing best-in-class machine-learning models powering unforgettable experiences.

To learn more- reach out to the writer here.

--

--