A Step-by-Step Guide to Building an end-to-end ML Pipeline for Demand Forecasting in Snowflake

Photo credit: Pixabay

Welcome back to the second part of our series on Demand Forecasting! (Please refer to the Snowflake documentation page for the latest methods and syntax for usage of the Model Registry feature)

In the previous installment, we explored how demand forecasting can be approached for a pharmaceutical organization. We learned how to use current perspectives instead of just historical sales data to ensure the Demand Forecasting model remains relevant in the long term. Please note that the Model Registry has undergone significant changes and I would recommend referring to the Snowflake documentation for the most recent syntax and features.

Today, we explore further how this can be put into action by training a regressor model using Snowpark ML and making use of the Snowpark ML Model Registry to complete the MLOps process. With the Snowpark ML Model Registry, Snowflake customers can conveniently handle and manage machine learning model deployments on Snowflake. This allows for seamless integration of models, their metadata, and their operational usage, all within a unified data platform.

An Unified, Secure, & Scalable Platform for ML with Snowflake*

*Some features in Preview at time of this writing

Snowpark ML

Snowpark ML is the umbrella term for a set of SDKs and components that help to build and deploy Machine Learning (ML) models in Snowflake. With Snowpark ML, you can manage and use ML features all inside Snowflake. Just choose a ML framework that fits your data trend, and one can carry out preprocessing, feature engineering, model training and model deployment in Snowflake. One can also toggle between a Warehouse and a GPU-based container depending on the workload size.

I will walk through the functions and routines to achieve the goal of forecasting demand for a product sold in a particular zip code. This can be adapted to other similar scenarios and transactions in other industries as well by modifying the features according to the nature of the business.

Data Preprocessing

For the purpose of this demonstration, we consider the TPCH Dataset which is completely random synthetic data and readily available in every Snowflake account. We will be adding variables like weather, holiday, allergy indicators, product rating, brand preference and supply chain risks combined with the past sales history of the allergy product, as we discussed in Part 1 of the blog.

The query below fetches the raw orders and the customer’s known allergy indicator. This is typically data integrated from several datasets like customer, order history and store seen as fitting in the integration layer of the data architecture.

df_sales = sp_session.sql("""SELECT to_timestamp(SS_SOLD_DATE_SK/1000) as ORDER_DATE,SS_ITEM_SK AS PRODUCT_ID,SS_CUSTOMER_SK AS CUSTOMER_ID,SS_ADDR_SK as ADDRESS_ID,SS_STORE_SK AS STORE_ID,SS_TICKET_NUMBER as ORDER_NUMBER,SS_QUANTITY AS QUANTITY, \\
CA_STATE AS STATE,ca_zip as CUSTOMER_ZIP,st.s_tax_precentage as tax_percent_levied,st.S_ZIP as zipcode,0 AS KNOWN_ALLERGIES \\
FROM SNOWFLAKE_SAMPLE_DATA.TPCDS_SF10TCL.STORE_SALES ss join SNOWFLAKE_SAMPLE_DATA.TPCDS_SF10TCL.CUSTOMER_ADDRESS ca on ca.CA_ADDRESS_SK = ss.SS_CUSTOMER_SK join \\
SNOWFLAKE_SAMPLE_DATA.TPCDS_SF10TCL.STORE st on st.s_store_sk=ss.ss_store_sk where CUSTOMER_ID IS NOT NULL and zipcode is not null and SS_ITEM_SK IS NOT NULL AND ss_store_sk IS NOT NULL AND
ORDER_DATE IS NOT NULL limit 1000000 \\
""")

Multiple customized routines are created to construct various features. These routines have been specifically designed to cater to the specific needs and requirements of the life sciences industry. In some cases, features from external sources were leveraged from enterprise data or third-party sources.

  1. One of the first steps is to explore the trends and seasonality in the data. This helps us understand the patterns and fluctuations in order transactions.
  2. As part of the feature construction process, we include additional attributes such as the quarter, day of the month, and month when these order transactions have taken place. This provides us with more detailed information about the timing of the transactions.
  3. To gain insights into the correlation between travel trends and sales, we fetch a holiday library and integrate it with the sales dataset. This allows us to examine how holidays and travel impact sales figures.
  4. We also consider factors such as humidity and temperature, which can have a significant impact on patients suffering from pulmonary disorders. These environmental factors can trigger asthmatic conditions and create a need for specific drugs. To incorporate this information into our analysis, we study and add humidity and temperature to our list of features. For this purpose, we utilize a weather dataset available in the Snowflake marketplace. One such example is the GLOBAL_WEATHER__CLIMATE_DATA_FOR_BI.STANDARD_TILE.HISTORY_DAY dataset.
  5. Another important factor that we take into account is the average product rating from customer reviews and social media. This helps us gauge the likelihood of customers making repeat purchases.
  6. Additionally, we consider pollen indicators from various datasets. These indicators further support our study of the relationship between environmental factors and product sales.

We finally have the integrated dataset and calculate the average sales of a product in a particular zip-code for a period.

spdf_orders.to_pandas().columns.tolist()

The list of features in the final integrated dataset is below :

[‘PRODUCT_ID’, ‘CUSTOMER_ID’, ‘ADDRESS_ID’, ‘STORE_ID’, ‘ORDER_NUMBER’, ‘QUANTITY’, ‘STATE’, ‘CUSTOMER_ZIP’, ‘TAX_PERCENT_LEVIED’, ‘ZIPCODE’, ‘KNOWN_ALLERGIES’, ‘QUARTER’, ‘MONTH’, ‘NUMBER_OF_DAYS_IN_MONTH’, ‘RATING’, ‘PRODUCT_SALES’, ‘AVG_TEMPERATURE’, ‘MAX_HUMIDITY’, ‘ORDER_DATE’, ‘PRODUCT_NAME’, ‘UNIT_PRICE’, ‘ALLERGYTRIGGER_GRASSES’, ‘ALLERGYTRIGGER_TREES’, ‘ALLERGYTRIGGER_WEEDS’, ‘ALLERGYINDEX’, ‘IS_HOLIDAY’, ‘DRUG_FAMILY’]

Now that the construction of features is complete, the next step is to encode categorical variables, transformation of features by scaling and removing dates along with other cleansing processes. The snowflake.ml.modeling.preprocessing has Encoders and KBinsDiscretizer to support this.

Model Building

The snowflake.ml.modeling API’s have classes to carry the desired functionality. Basically, these are wrappers on scikit-learn, xgboost, lightgbm and offer improved performance and scalability with distributed execution of many of the preprocessing functions. Also, you would not need to add an additional Stored Procedure while packaging these, thankfully one less object to maintain.

So for our demand forecasting model, we have leveraged a XGBRegressor from the snowflake.ml.modeling.xgboost class. Using the random_split function from the final sales dataset to split into training and test sets.

The random_split function takes a list of weights that determine the proportion of data to be assigned to each split. We are allocating 90% of the data to the training set and 10% to the test set. I have used an imputer to fill in any NaN values.

Feature column name : This variable contains a list of column names that will be used as input features for your model. These columns are extracted from the sales dataset. We have excluded certain columns like ids, dates etc.

label column : Target Variable

target variable : Forecasted Sales

from snowflake.ml.modeling.xgboost import XGBRegressor
from snowflake.ml.modeling.impute import SimpleImputer
from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.ml.modeling.compose import ColumnTransformer
from snowflake.ml.modeling.model_selection import GridSearchCVmodel = Pipeline(steps=[
(
"preprocessing",
ColumnTransformer(transformers=[
("fill_nan", SimpleImputer(strategy='constant', fill_value=0.0), feature_column_names)
])
)
])
regressor = XGBRegressor(
input_cols=feature_column_names,
label_cols=label_column_name,
output_cols=output_cols
)
regressor.fit(train_df)
# Predict
result = regressor.predict(test_df)

MAPE (Mean Absolute Percentage Error) is a widely used metric in time series forecasting to evaluate the accuracy of a prediction or forecasting model. It is particularly useful when dealing with data that has varying scales or magnitudes, and when you want to assess the performance of your model compared to its predictions. We chose to use MAPE because the dataset has been meticulously curated, making outliers and asymmetry rare occurrences. One can employ MSE or other metrics as well, depending on the data.

Hyperparameter tuning

GridSearchCV performs an exhaustive search over a specified hyperparameter grid. It systematically evaluates all possible combinations of hyperparameter values, making it more likely to find the optimal set of hyperparameters. When it comes to hyperparameter tuning, which is a crucial part of the data science workflow, the Snowpark ML library provides distributed versions of the scikit-learn GridSearchCV and RandomizedSearchCV APIs. These versions enable efficient hyperparameter tuning on both single-node and multiple-node warehouses. 1. In this case the snowflake.ml.modeling.model_selection.GridSearchCV class was chosen to select the best model configuration.

param_distributions={
"n_estimators":[100, 200, 300,400,500],
"learning_rate":[0.1, 0.2, 0.3, 0.4, 0.5],
}
Gridsearch = GridSearchCV(
estimator=XGBRegressor(),
param_grid=param_distributions,
n_jobs = -1,
scoring="neg_mean_absolute_percentage_error",
input_cols=feature_column_names,
label_cols=label_column_name,
output_cols=output_cols)

Switching to Snowpark ML Optimized warehouse before the fitting the training data.

sp_session.sql(f'''CREATE or replace WAREHOUSE snowpark_ml 
WAREHOUSE_TYPE ='SNOWPARK-OPTIMIZED'
AUTO_SUSPEND = 200
WAREHOUSE_SIZE=MEDIUM INITIALLY_SUSPENDED=TRUE''').collect()
sp_session.use_warehouse('snowpark_ml')
# Train
Gridsearch.fit(train_df)

The best_estimator_ attribute holds the trained model that achieved the best performance based on the specified scoring metric. So, after running Gridsearch.fit(train_df), you can access the best model using Gridsearch.best_estimator_. We can then use this best model to make predictions on new data or further analyze its properties.

best_estimator = Gridsearch.to_sklearn().best_estimator_
# Analyze grid search results
gs_results = Gridsearch.to_sklearn().cv_results_
n_estimators_val = []
learning_rate_val = []
for param_dict in gs_results["params"]:
n_estimators_val.append(param_dict["n_estimators"])
learning_rate_val.append(param_dict["learning_rate"])
mape_val = gs_results["mean_test_score"]*-1

Let’s retrieve the cross-validation results from the GridSearchCV object. The cv_results_ attribute contains a dictionary with information about the performance of each combination of hyperparameters during the grid search. Initialize variables to store the values of the "n_estimators" and "learning_rate" hyperparameters for each combination in the grid search and also calculate the MAPE values based on the mean test scores.

#Analyze results
mape = mean_absolute_percentage_error(df=result,
y_true_col_names="PRODUCT_SALES",
y_pred_col_names="FORECASTED_PRODUCT_SALES")
result.select("PRODUCT_SALES", "FORECASTED_PRODUCT_SALES").show()
print(f"Mean absolute percentage error: {mape}")
------------------------------------------------
|"PRODUCT_SALES" |"FORECASTED_PRODUCT_SALES" |
------------------------------------------------
|67.0 |64.97743225097656 |
|50.0 |68.40835571289062 |
|28.0 |38.66513442993164 |
|48.0 |52.33694839477539 |
|40.0 |36.26590347290039 |
|43.0 |54.94252014160156 |
|27.0 |37.24953842163086 |
|65.0 |66.995853424072266 |
|48.0 |53.990840911865234 |
------------------------------------------------
Mean absolute percentage error: 0.551858329714832

Next let’s extract the information about the optimal model and its metadata, particularly the hyperparameter values and the associated Mean Absolute Percentage Error (MAPE) from the results of a grid search.

# Let's save our optimal model first and its metadata
optimal_model = Gridsearch.to_sklearn().best_estimator_
optimal_n_estimators = Gridsearch.to_sklearn().best_estimator_.n_estimators
optimal_learning_rate = Gridsearch.to_sklearn().best_estimator_.learning_rate
optimal_mape = gs_results_df.loc[(gs_results_df['n_estimators']==optimal_n_estimators) &
(gs_results_df['learning_rate']==optimal_learning_rate), 'mape'].values[0]

Snowpark ML Model Registry

Snowpark ML model registry stores Python ML models so they can easily be found and used by others. It supports not only models trained in Snowpark ML, but a wide variety of models from external tools as well as open-source repositories -it packages them into a standard model entity in Snowflake, and provides easy to use model.deploy() API’s to create inference endpoints for the Warehouse or the Snowpark Container Service. One can also track model versions, and store metrics associated with models using the Model Registry.

from snowflake.ml.registry import model_registry
from snowflake.ml._internal.utils import identifier
# registry logging function
X = train_df.select(feature_column_names).limit(100)
db = identifier._get_unescaped_name(sp_session.get_current_database())
schema = identifier._get_unescaped_name(sp_session.get_current_schema())
# Define model name and version
model_name = "demandforecast"
model_version = 3
# Create a registry and log the model
registry = model_registry.ModelRegistry(session=sp_session, database_name=db, schema_name=schema, create_if_not_exists=True)
registry.log_model(
model_name=model_name,
model_version=model_version,
model=optimal_model,
sample_input_data=X,
options={"embed_local_ml_library": True, # This option is enabled to pull latest dev code changes.
"relax": True} # relax dependencies
)
registry.set_metric(model_name=model_name, model_version=model_version, metric_name="mean_abs_pct_err", metric_value=optimal_mape)

The model list is a SnowPark DataFrame, so we can easily choose which columns to display and filter or sort it as desired

registry.list_models().to_pandas()

One can deploy using 2 methods:

  1. Using a Model in a Snowflake Warehouse To deploy a model to a warehouse, use the model’s deploy method. The registry generates a user-defined function (UDF), using a name you provide, that invokes the model’s predict method.
  2. Using a Model in an SPCS Compute Pool — This will be explored in another blog which is coming soon!
model_deployment_name = model_name + f"{model_version}" + "_UDF"
registry.deploy(model_name=model_name,
model_version=model_version,
deployment_name=model_deployment_name,
target_method="predict",
permanent=True,
options={"relax_version": True})
# Check the list of deployments
registry.list_deployments(model_name, model_version).to_pandas()

Finally lets reference the model by calling the model’s predict method with the deployment name you specified when you deployed it.

# Reference the registry using a function call
model_ref = model_registry.ModelReference(registry=registry, model_name=model_name, model_version=model_version)
# Use the deployed model to perform inference
result_sdf = model_ref.predict(deployment_name=model_deployment_name, data=test_df)
result_sdf.show()

Demand forecasting is not merely a technical exercise but a strategic imperative. Businesses that embrace forecasting as a strategic tool gain a nuanced understanding of market dynamics, empowering them to make informed decisions and stay resilient in the face of uncertainties. In this blog, we saw how to strategically adopt adaptive and innovative features through the demand forecasting journey with Snowflake Data Cloud and build an end-to-end secure and governed ML application.

In the next episode, we will explore how to use the new Snowflake Python API to create and manage Snowflake tasks. This API allows us to programmatically schedule the ongoing demand forecasting workflow. It provides a convenient way to create a series of tasks and manage them in a directed acyclic graph (DAG).

Keep innovating in the Data Cloud!

--

--