Predictive Maintenance — Failure Prediction powered by Snowflake

Utilizing Snowflake’s Snowpark ML capabilities has been pivotal in swiftly delivering insightful predictive maintenance solution to streamline fleet management.

Introduction

Welcome to the Manufacturing 4.0! In this blog post, we’ll explore how Snowflake’s Snowpark ML capabilities are revolutionizing fleet management and asset maintenance in the automotive and manufacturing sectors. By harnessing the power of advanced analytics and machine learning, organizations can proactively anticipate equipment failures, minimize downtime, and maximize operational efficiency and profitability.

Understanding Predictive Maintenance

Predictive maintenance employs data and analytics to anticipate equipment failures before they happen, by utilizing historical data and real-time monitoring alongside machine learning algorithms. This proactive strategy enables timely repairs or replacements, minimizing downtime, reducing costs associated with unexpected breakdowns, and optimizing asset performance. In manufacturing and automotive sectors, where downtime entails substantial production losses and maintenance expenses, predictive maintenance is indispensable. Accurate predictions facilitate scheduled maintenance during planned downtimes, averting unplanned shutdowns and disruptions to production lines. This approach enhances equipment uptime, operational efficiency, and overall productivity and profitability.

Translating Insights into Action — The Application Perspective

Our operationalized fleet dashboard application empowers stakeholders with actionable insights into vehicle health. Through an operationalized fleet dashboard, organizations gain comprehensive visibility into vehicle conditions, enabling informed decision-making regarding maintenance scheduling and resource allocation. This proactive approach streamlines fleet management, minimizing production disruptions and maximizing productivity.

Fleet 360 dashboard

Utilization of Model Predictions

By integrating model results into our dashboard, we fuel informed decision-making and drive operational improvements. By surfacing predictions through an intuitive dashboard, stakeholders can prioritize maintenance tasks, allocate resources efficiently, and minimize vehicle downtime. This seamless integration of predictive insights into the operational workflow enhances overall productivity and profitability.

Unveiling Failure Indicators - Model Explanation

Understanding the rationale behind model predictions is essential for trust and interpretability. We leverage SHAP (SHapley Additive exPlanations) values provide insights into feature contributions to model predictions. Analyzing these contributions helps organizations identify key factors influencing vehicle failures and refine predictive models accordingly.

Global SHAP Explanation

Analysis of SHAP values reveals crucial insights into the factors driving vehicle failures. Environmental conditions, such as high wind speeds, emerge as significant predictors of potential failures, emphasizing the importance of incorporating third-party weather data from Snowflake Marketplace into predictive models. From the visualization above, we can also see that high battery voltage readings may indicate impending failures.

Snowflake Marketplace Weather Source

Laying the Groundwork — Data in Predictive Maintenance

Data Model

Our foundational data consolidates Vehicle IoT sensor data spanning 300+ parameters with new readings coming in every minute along with 700+ Diagnostic Trouble Codes (DTC) that were being generated by these vehicles. This IoT data is unified with meta vehicle information and service data. As mentioned above, we are also leveraging weather data from Snowflake Marketplace data — Weather Source within our data model. Challenges associated with IoT data, such as volume and velocity, are seamlessly addressed through Snowflake’s scalable architecture, separating compute and storage resources while offering instant elasticity and built-in optimization.

Empowering Scalable Data Transformation — Snowpark Dataframe & ML Preprocessing APIs

A Forbes survey reveals that data scientists dedicate 80% of their time to data preparation, underscoring the pivotal role of feature engineering in data science. Ordinarily, this would require data moving in out of the data stores and moved to the local disk of our data scientists to be wrangled leading to serious security risks and memory issues. This vital step is made efficient and streamlined when we leverage Snowpark ML Preprocessing APIs coupled with Snowpark DataFrame.

The Snowpark ML Preprocessing functions use the SQL engine in Snowflake for computation, enabling distributed multi-node execution. This approach offers several advantages: it allows data processing to be handled within the Snowflake environment, facilitating seamless integration with Snowpark DataFrames. By leveraging the snowflake-ml library, users can code in a manner similar to Scikit-Learn, accessing data through a Snowpark DataFrame. This setup enables complex data transformations and feature engineering tasks to be executed with push-down onto Snowflake compute, ensuring data security and governance while streamlining the preprocessing workflow. All this without our data ever moving outside the Snowflake environment.

Below is a code snippet of where we leveraged Label encoder from snowflake.ml.modeling.preprocessing to encode categorical columns in our dataset.

# Encode categorical columns w Snowpark Modeling API's Label Encoder
import snowflake.ml.modeling.preprocessing as snowml

# List of categorical columns in our dataset
cat_cols = [...]

cat_enc_df = df.select(cat_cols)
for inp_col in cat_cols:
enc_col = 'ENC_' + inp_col
snowml_oe = snowml.LabelEncoder(input_cols=inp_col, output_cols=enc_col)
cat_enc_df = snowml_oe.fit(cat_enc_df).transform(cat_enc_df)
cat_enc_df = cat_enc_df.withColumn(enc_col+'_NEW',F.cast(enc_col, \
T.IntegerType())).drop(enc_col).rename(enc_col+'_NEW',enc_col)

Learning from our data — Model Training

Train models using Snowpark ML Xgboost classifier

Snowflake’s Snowpark ML modeling API leverages optimized compute infrastructure to train machine learning models at scale. We leverage snowflake.ml.modeling to train Xgboost classifier models to predict failure types using the prepped train data. The algorithms provided in snowflake.ml.modeling are equivalent to those found in popular open-source libraries such as Scikit-Learn, XGBoost, and LightGBM. To train a model in Snowflake using snowflake-ml, we use model.fit() function which creates a temporary stored procedure in the background enabling us to leverage Snowflake compute to train the model and run it in the secure Python sandbox.

In the snippet below, we train our models and once trained we deploy those onto Snowflake stage.

# Import XGBClassifier from snowflake.ml.modeling.
from snowflake.ml.modeling.xgboost import XGBClassifierdef model_training(session, \
train_table, \
model_name, \
feat_cols, \
target_col,\
output_col, \
stage):
# Create Snowpark Dataframe from training table
train_df = session.table(train_table)

# Define the XGBClassifier
neg_count = session.table(train_table).where(F.col(target_col) == F.lit(0)).count()
pos_count = session.table(train_table).where(F.col(target_col) == F.lit(1)).count()
scale_pos_weight = neg_count/pos_count
params = {
"eta": 0.1,
"objective": "binary:logistic",
"subsample": 0.5,
"base_score": 0.5,
"eval_metric": "aucpr",
"scale_pos_weight" : scale_pos_weight
}

model = XGBClassifier(
input_cols= feat_cols,
label_cols= [target_col],
output_cols=[output_col],
**params
)

# Train
model.fit(train_df)

# Save our model
xgbmodel = model.to_xgboost()

# Pickling model
joblib.dump(xgbmodel, model_name+'.joblib')

# Saving pickled object on to the stage
session.file.put(model_name+'.joblib', '@'+stage, overwrite=True)
return model_name + ' model trained successfully'

# Provide the training table & Snowflake Stage
train_table = ...
stage = ...
# Add the features utilized
feat_cols = [...]

# Define parameters
model_name = ...
target_col = ...
output_col = 'PRED_'+target_col

# Call function for model training
model_training(session, train_table, model_name, feat_cols, target_col, \
output_col, stage)

Realizing Predictive Insights — Model Inference

Model inference, facilitated by Snowflake’s Snowpark stored procedures, enables real-time deployment of trained models for predictions. By embedding model inference capabilities directly within the Snowflake platform, we ensure democratized access to predictive insights while maintaining data security and governance. These insights drive informed decision-making and optimize fleet performance.

Creating a Snowpark stored procedure is simple two step process
1. Creating a python function — loads deployed model, obtains predictions and does any post processing required.

def predict_fail( session: Session, \
input_table: str, \
feat_cols: list, \
output_table: str):
@cachetools.cached(cache={})
def load_model(filename):

# Import packages
import sys
import os
import joblib

# Get the import directory where the model file is stored
import_dir = sys._xoptions.get("snowflake_import_directory")

# Load and return the model
if import_dir:
with open(os.path.join(import_dir, filename), 'rb') as file:
m = joblib.load(file)
return m

df = session.table(input_table).toPandas()
dtc_types = session.table("CAT_COL_MAP").filter(F.col("COLUMN_NAME") == F.lit("SUMMARIZED_DTC_TYPE")) \
.select(["KEY"]).toPandas()["KEY"].str.upper()
key_cols = ['DATE', 'VEHICLE_ID']
prob_cols = []
for type in dtc_types:
# Load model
model = load_model(type+"_FLG.joblib.gz")
# Get predictions
df['PROB_'+type] = pd.Series(np.array(model.predict_proba(df[feat_cols]))[:,1]).astype('float')
prob_cols += ['PROB_'+type]

output_df = df[key_cols + prob_cols]
output_df = session.create_dataframe(output_df)

# Get max(predict_fail_type) based on all offsets for a given vehicle id & date
agg_df = output_df.group_by(["VEHICLE_ID", "DATE"]) \
.agg(max_("PROB_"+dtc_types[0]).alias("AGG_PROB_"+dtc_types[0]),max_("PROB_"+dtc_types[1]).alias("AGG_PROB_"+dtc_types[1]), \
max_("PROB_"+dtc_types[2]).alias("AGG_PROB_"+dtc_types[2]), max_("PROB_"+dtc_types[3]).alias("AGG_PROB_"+dtc_types[3]), \
max_("PROB_"+dtc_types[4]).alias("AGG_PROB_"+dtc_types[4]),min_("PROB_"+dtc_types[5]).alias("AGG_PROB_"+dtc_types[5]))

# Save prediction into snowflake table
agg_df.write.mode('overwrite').save_as_table(output_table)

return "Complete"

2. Register function as Snowpark Stored Procedure — We register the function on Snowflake as a stored procedure to democratize access to model inference.

# Register stored procedure
predict_failure_sp = session.sproc.register(
func= ..., # Provide the python function name
name= "...", # Provide the name for the procedure
is_permanent=True,
replace=True,
stage_location= ..., # Provide stage name where model is to be deployed
return_type=T.VariantType(),
imports=[...], # Provide model files to be imported
packages=["snowflake-snowpark-python","pandas","xgboost", "joblib", \
"cachetools"] # Packages required
)

There has been a new exciting development in the world of Snowpark ML since this project was built — Snowpark ML Operations (MLOps) — Model Registry. The two step process outlined above is further simplified by leveraging Snowpark Model Registry (currently in Public Preview). It allows us to deploy models within Snowflake using a Python API, eliminating the need to create a Python UDF/Stored Procedure from scratch.

Predictive maintenance fueled by Snowpark

In summary, Snowpark’s ML capabilities have transformed predictive maintenance in manufacturing and automotive sectors and enabled proactive asset failure anticipation and streamlined fleet management. Snowpark ML allows us to transform our raw data and train ML models all within Snowflake. It enables organizations to minimize downtime and optimize operational efficiency by addressing potential issues before they occur. Through streamlined pipelines and integrated model predictions, stakeholders gain valuable insights into vehicle health, facilitating informed decision-making on maintenance scheduling and resource allocation. This seamlessly integrated pipeline, empowers organizations to democratize model inference, maintain data security and ensure data governance.

Solution designed and developed by team frostbyte and Snowflake industry experts: Shriya Rai, Swathi Jasti, Jacob Kranzler, Rob Guglietti, Tim Long, Brendan Tisseur, Kaila Chen, Vernon Tan and HanSon Tieu.

Contact Snowflake:

Web: https://www.snowflake.com/contact/

Email: se-sit-frostbyte-DL@snowflake.com

--

--