A Basic Data Science Workflow with Snowpark for Python

Learn about Snowflake’s Snowpark while helping food trucks find the best parking locations for their upcoming shifts.

End-to-end data science workflow overview with Snowflake.

We will look at using Snowpark for the following core components of a data science workflow:

  • Accessing Data
  • Exploring Data
  • Feature Engineering
  • Model Training
  • Model Deployment
  • Model Inference

To walk through the data science workflow, we will be helping (fake) food trucks find the best parking locations. To do this, we have built a model that predicts the shift sales of all parking locations for the upcoming morning and evening shifts.

Modeling scenario output example with predicted shift sales for each location.

All code is available in this step-by-step QuickStart Guide.

What is Snowpark for Python?

Snowpark allows developers to query and deploy functions in Python through APIs and DataFrame-style programming constructs that run on Snowflake’s elastic engine.

What is a Snowpark DataFrame?

A DataFrame represents a relational dataset that is evaluated lazily; it only executes when a specific action is triggered.

It allows users to work in a Python environment, chain transformations, and use built-in Snowflake functions without moving data out of Snowflake. Snowpark DataFrame transformations are translated and executed as SQL in Snowflake. This means the queries are fast and optimized.

For example, a series of Snowpark DataFrame transformations:

# Select
location_df = snowpark_df.select("date", "shift", "shift_sales", "location_id", "city")

# Filter
location_df = location_df.filter(col("location_id") == 1135)

# Sort
location_df = location_df.order_by(["date", "shift"], ascending=[0, 0])

# Display
location_df.show(n=20)

Will be executed as SQL in Snowflake (as seen under SQL Text in the Snowflake Query History):

Query history in the Snowflake Snowsight interface for the executed Snowpark DataFrame transformations.

What are Python Stored Procedures and User-Defined Functions?

Stored procedures and user-defined functions are the mechanisms for deploying Python code, often using Python packages, into Snowflake to be executed in a secure sandbox environment within the Snowflake platform.

Stored procedures work well for training because they can read data, hold an entire table in memory to find patterns, and write files (e.g. model files) back to the Snowflake database.

User-defined functions work well for inference because they return a single value for each row passed to the user-defined function. Because of this, they can easily be distributed to provide fast results.

1. Accessing Data

Creating a Snowpark Dataframe

We connect to Snowflake by creating a session with our Snowflake account credentials. Once connected, we use session.table to specify which Snowflake table or view we want to be working with as a DataFrame.

# Create Snowpark session
session = Session.builder.configs(connection_parameters).create()

# Create Snowpark DataFrame
snowpark_df = session.table("shift_sales_v")

2. Exploring Data

When using Snowpark to explore data, a common pattern is to use Snowpark to manipulate the data and then bring an aggregate table to our Python environment to visualize. This takes advantage of Snowflake’s native performance and scale for aggregating large datasets and the easy transfer of aggregate data to the client-side environment for visualization.

In this example, we group our shift sales data by city and calculate the average shift sales for each city. We then sort by the average. To create the bar chart, we use to_pandas() to bring the sorted aggregate table to our Python environment and then plot the horizontal bar chart.

# Group by city and average shift sales
analysis_df = snowpark_df.group_by("city").agg(mean("shift_sales").alias("avg_shift_sales"))

# Sort by average shift sales
analysis_df = analysis_df.sort("avg_shift_sales", ascending=True)

# Pull to pandas and plot
analysis_df.to_pandas().plot.barh(x="CITY", y="AVG_SHIFT_SALES")
Output plot showing sales shift average by city.

3. Feature Engineering

Aside from taking advantage of Snowflake’s ability to work with data at scale, Snowpark DataFrame syntax makes it easy to chain transformations and create new columns. We will use Snowpark to create a new feature, perform a series of transformations on existing columns, and split the data for training and testing.

Create a Rolling Aggregate Feature

# Create a window that paritions on location and includes all preceding rows
window_by_location_all_days = (
Window.partition_by("location_id", "shift")
.order_by("date")
.rows_between(Window.UNBOUNDED_PRECEDING, Window.CURRENT_ROW - 1)
)

# Average shift sales across the window
snowpark_df = snowpark_df.with_column(
"avg_location_shift_sales",
avg("shift_sales").over(window_by_location_all_days)
)

Impute, Encode, & Drop Columns

# Impute missing values
snowpark_df = snowpark_df.fillna(value=0, subset=["avg_location_shift_sales"])

# Encode categorical column
snowpark_df = snowpark_df.with_column("shift", iff(col("shift") == "AM", 1, 0))

# Drop columns
snowpark_df = snowpark_df.drop("location_id", "city", "date")

Split Data into Training and Testing

train_snowpark_df, test_snowpark_df = snowpark_df.randomSplit([0.8, 0.2])

For all of these transformations, no data was pulled out of Snowflake and no copies of the data were created.

4. Model Training

We will create a Snowflake stored procedure for model training. The first step is to create a function that performs model training and saves the model file. The second step is to register that function in Snowflake as a stored procedure.

Step 1. Create a Function for Training a Model

This function trains a Scikit-learn linear regression model and saves the model to a stage. Linear regression finds a line that best fits the data points used in training. We then use that line as an estimation of where output values will be for future scenarios.

Training will use historical shift sales and features in our data to predict future shift sales of locations where our food trucks can park.

  • Inputs: Training table name on Snowflake, feature column names, target column names, file name for the saved model
  • Outputs: Feature weights of the trained model
def train_linreg(
session: Session,
training_table: str,
feature_cols: list,
target_col: str,
model_name: str,
) -> T.Variant:

# Import packages
from sklearn.linear_model import LinearRegression
from joblib import dump

# Get training data
df = session.table(training_table).to_pandas()

# Set inputs X and outputs y
X = df[feature_cols]
y = df[target_col]

# Train model
model = LinearRegression().fit(X, y)

# Get feature weights
feature_weights = pd.DataFrame({"Feature": model.feature_names_in_, "Weight": model.coef_}).to_dict()

# Save model
dump(model, "/tmp/" + model_name)
session.file.put(
"/tmp/" + model_name,
"@MODEL_STAGE",
auto_compress=False,
overwrite=True
)

# Return feature contributions
return feature_weights

Step 2. Register the Function on Snowflake

To register the function on Snowflake as a stored procedure, we need to specify what Python packages are required in the function. Here we specify:

  • Snowpark
  • Scikit-learn (for training our model)
  • Joblib (for creating a model file)

Scikit-learn is a popular Python library for Machine Learning. We will be able to leverage its functionality in Snowflake in our deployed stored procedure.

train_linreg_snowflake = session.sproc.register(
func=train_linreg,
name="sproc_train_linreg",
is_permanent=True,
replace=True,
stage_location="@MODEL_STAGE",
packages=["snowflake-snowpark-python", "scikit-learn", "joblib"]
)

Train the Model

Once registered, we can train our model in Snowflake by calling our stored procedure. If more memory is required, a Snowpark Optimized Warehouse could be used. In this case, a standard Snowflake warehouse is sufficient.

feature_weights = train_linreg_snowflake(
session, training_table, feature_cols, target_col, model_name
)

5. Model Deployment

We will deploy the model as a user-defined function in Snowflake. This will make the model available in a centralized location, where the new data is coming in. This will make it easy to get updated predictions for shift sales. Just as when creating a stored procedure, we first define a function that does inference and then register it to Snowflake.

Step 1. Create a Function for Model Inference

This function loads the saved model and predicts shift sales from the input features.

  • Inputs: Features
  • Outputs: Predicted shift sales

Because we specified the input as a pandas DataFrame and output as a pandas Series, the user-defined function will execute in batches instead of one row at a time. In many cases, this reduces the overall execution time for the query calling the user-defined function.

We also use the Python library cachetools to cache the model that is returned from the load_model function. This ensures that the model is loaded once per user-defined function call, instead of for each row or batch in the data.

# Function to load the model from file and cache the result
@cachetools.cached(cache={})
def load_model(filename):

# Import packages
import sys
import os
import joblib

# Get the import directory where the model file is stored
import_dir = sys._xoptions.get("snowflake_import_directory")

# Get the import directory where the model file is stored
if import_dir:
with open(os.path.join(import_dir, filename), 'rb') as file:
m = joblib.load(file)
return m

# Function to predict shift sales
def linreg_predict_location_sales(X: pd.DataFrame) -> pd.Series:

# Load the model
model = load_model("linreg_location_sales_model.sav")

# Get predictions
predictions = model.predict(X)

# Return rounded predictions
return predictions.round(2)

Step 2. Register the Function on Snowflake

To register the function on Snowflake as a user-defined function, we specify the Python packages required. Here we specify:

  • Scikit-learn (for making predictions)
  • Joblib (for loading the model from file)
  • Cachetools (for caching the loaded model)

Additionally, we specify which files to import with the user-defined function. Here, we specify our trained model file as an import.

session.udf.register(
func=linreg_predict_location_sales,
name="udf_linreg_predict_location_sales",
stage_location="@MODEL_STAGE",
input_types=[T.FloatType()] * len(feature_cols),
return_type=T.FloatType(),
replace=True,
is_permanent=True,
imports=["@MODEL_STAGE/linreg_location_sales_model.sav"],
packages=["scikit-learn", "joblib", "cachetools"]
)

6. Model Inference

Now that our user-defined function is deployed, we can use it in Snowflake on our data to get shift sales predictions.

Scale Up Snowflake Compute

We can perform distributed model inferencing across the nodes of our Snowflake warehouse by scaling up to a multi-node warehouse. By scaling up to a medium warehouse, the processing will distribute across all four warehouse nodes instead of using single-threaded execution on an x-small warehouse with 1 node. A medium warehouse was not required up to this point in our data science workflow, but we can easily adjust our warehouse now to meet the compute requirements for this portion of the workflow.

session.sql("ALTER WAREHOUSE tasty_dsci_wh SET WAREHOUSE_SIZE = MEDIUM").collect()

Call the Inference Function

We can call the user-defined function in the select statement with the call_udf function.

predictions_df = test_snowpark_df.select(
"shift_sales",
call_udf("udf_linreg_predict_location_sales", [col(c) for c in feature_cols]).alias("prediction")
)

Streamlit

We used this same user-defined function to power a Streamlit application (shown below). The application allows truck drivers to navigate to their city and see predicted shift sales displayed on a map. Streamlit (acquired by Snowflake in March 2022) is a Python library that makes it easy to create and share custom web apps.

Streamlit application that gets model predictions for locations in the selected city through the user-defined function deployed in Snowflake.

Summary

The Main Benefits of Using Snowpark for Data Science

  • Working with data efficiently at scale
  • No movement or copies of data
  • Leveraging Snowflake scalable compute
  • Snowflake governance is maintained
  • Centralized deployment of functions

Try it out!

If you want to try out the end-to-end workflow, you can follow this step-by-step QuickStart Guide to go from getting data from the Snowflake Data Marketplace, training the model, to creating the Streamlit application.

--

--

Marie Coolsaet
Snowflake Builders Blog: Data Engineers, App Developers, AI/ML, & Data Science

Senior Data Science Specialist at Snowflake. Bachelors in Electrical Engineering from Queen’s University.