Anomaly Detection for Time series Data in Snowflake Data Cloud

Image Credit : Forbes

In today’s fast-paced smart manufacturing 4.0 industry, organizations are faced with the challenge of dealing with an increasing number of sensors and finding cost-effective solutions for data transmission, management and storage. With the rise of Industrial Internet of Things (IIoT), industrial systems such as power plants, wind turbines, engines, and more generate large amounts of time series data tags during their regular operations.

Monitoring these systems is crucial to identifying abnormal behaviors that could cause reliability issues. Anomalies, also known as outliers, can occur due to sensor failure, human error, or mechanical issues. Undetected anomalies can lead to expensive downtime, equipment damage, and safety hazards. Developing an effective anomaly detection system that quickly detects abnormal behavior and alerts maintenance teams is necessary to avoid such situations. Early detection helps organizations reduce downtime, improve productivity, and save significant costs.

In this context, monitoring the condition of factory floor assets can help organizations :

  • Reduce downtime.
  • Increase process efficiency and improve material flow control.
  • Extend the lifespan of assets.
  • Maximize return on investments.
  • Make informed decisions on when to repair, replace, or retire assets.
  • Eliminate the need for unnecessary repairs and replacements.

Now that we have defined why there is an urging need and the case statement, lets solve it!

Outlier Detection on Time Series Data using an unsupervised approach

Lets understand what is Time Series Data and why an unsupervised approach is instrumental here.

By definition, a time series is a series of data points indexed (or listed or graphed) in time order[Wikipedia]. A time series is a sequence taken at successive equally spaced points in time. When working with large data sets, it’s common to encounter complex patterns. This is particularly true for time series data, which is characterized by high velocity, volume, and variety. Such complexity can make it challenging to identify anomalous patterns that may indicate potential issues and insights manually.

To tackle this issue, data scientists have created specific algorithms that automatically detect anomalies in time series data. These algorithms analyze patterns and trends in large data sets to spot deviations from expected patterns, which can suggest the presence of anomalies.

Machine learning can rapidly detect anomalies in real-time by processing vast amounts of data produced by industrial systems. An unsupervised machine learning algorithm, like deep learning with an auto-encoder model, can be utilized to identify crucial events and sensor attributes.

And why use an unsupervised approach? In a multivariate time series with high complexity, a definite, rule-based, or supervised approach may not be effective and may break quickly due to the number and variety of data points. Additionally, outliers, which are deviations from patterns, can occur in any single channel or in the correlation of channels.

The challenge lies with the millions of data points emitted from a number of sensors and devices and not to forget we are moving towards a billions of devices by 2025.

  1. How can we extract insights from such a vast amount of data points?
  2. How can we translate tags into logic, patterns, and effective business strategies?
  3. How can we implement this on a large scale while keeping costs minimal?

The Snowflake Solution

Snowflake Data Cloud supports organizations by providing a secure and governed platform with unlimited compute for handling and processing large volumes of data generated by Industrial IoT devices. It also serves as a centralized location for all data-related operations, enabling data-driven digital insights.

Snowflake had launched the Manufacturing Data Cloud earlier this year — a global network designed to help manufacturers unlock the value of their critical data and collaborate with their partners, suppliers, and customers in a secure and scalable way. The Manufacturing Data Cloud is part of the Snowflake platform, which offers a single, fully managed, multi-cloud platform for data consolidation, governance, and performance. Snowflake’s ecosystem of 50 manufacturing partners delivers pre-built solutions and industry data sets to support a diverse set of manufacturing and industrial use cases.

Data Ingestion :

One of the challenges of the IoT is data ingestion, which can be complicated and time-consuming. However, using an MQTT Broker can simplify this process and make it more efficient. How to achieve this efficiently and easily with an IOT Bridge in Snowflake has been covered in a separate post. Also there are many other posts written around how to construct a Machine learning pipeline using Snowpark and hence this post will not go into step by step details of that.

We will explore how machine learning detects contextual anomalies in IoT data. Contextual anomalies occur within a specific context or situation. For instance, if a factory floor sensor’s temperature readings go above the upper threshold, that’s a contextual anomaly. Detecting these is crucial for businesses to address issues quickly. Using machine learning, multivariate anomaly detection techniques can be applied to take corrective action before problems become bigger.

A typical machine learning algorithm begins its learning process with a series of data points, such as time series data corresponding to a set of good ventilator valves. It is trained on this information in an unsupervised fashion without any labeling, and over time, it can determine what constitutes a good valve.

Machine Learning Pipeline — High-Level Architecture

Fig 1. End to end ML pipeline leveraging Snowpark

Our objective is to detect outliers in a time series data by analyzing the vibrational sensor readings sourced from the NASA Acoustics and Vibration Database. The dataset can be obtained from the NASA Bearings dataset from here. Our analysis will be based on a single dataset, consisting of readings taken at 10-minute intervals. Each file about 20,480 sensor data points that were obtained from readings of the bearing sensors.

We will use an auto encoder neural network which is created within the is created using Long Short-Term Memory (LSTM) recurrent neural network (RNN) cells within the Keras / TensorFlow framework. Long Short Term Memory networks — usually just called “LSTMs” — are a special kind of recurrent neural network (RNN), capable of learning long-term dependencies.

Let’s start with the Data Preprocessing. In any real-life situation, the bearings experience mechanical degradation over time. Hence, we analyze using one datapoint every 10 minutes over some good set of data. We aggregate each 10-minute datapoint by taking the mean absolute value of the vibration recordings comprising of all the 20,480 datapoints in the file. Finally, we merge the data into a single data frame.

Snowpark library provides an intuitive library for querying and processing data at scale in Snowflake. It simplifies the process of building complex data pipelines and allows you to interact with Snowflake directly without moving data to the system where your application code runs. We will be leveraging Snowpark for preprocessing, model training and inference.

Import Packages

import json
from snowflake.snowpark.session import Session
import snowflake.snowpark.functions as F
import snowflake.snowpark.types as T
from snowflake.snowpark.functions import *
#from sklearn.metrics import classification_report,accuracy_score, confusion_matrix,roc_auc_score, precision_recall_curve, roc_curve, auc, average_precision_score,plot_roc_curve
# import libraries

import os, configparser ,json ,logging
import pandas as pd
from datetime import datetime
import numpy as np
from sklearn.preprocessing import MinMaxScaler
import joblib
import seaborn as sns
sns.set(color_codes=True)
from IPython.display import display, HTML, Image , Markdown
import matplotlib.pyplot as plt
%matplotlib inline

from numpy.random import seed
import tensorflow as tf


import keras
from keras.layers import Input, Dropout, Dense, LSTM, TimeDistributed, RepeatVector
from keras.models import Model
from keras import regularizers

Snowflake Environment Setup and establishing a connection to Snowflake

1. Establish a Snowpark Session

Create a Snowpark session by following the steps here. A snippet is given below:

# Get account credentials from a json file
with open("data_scientist_auth.json") as f:
data = json.load(f)
username = data["username"]
password = data["password"]
account = data["account"]

# Specify connection parameters
connection_parameters = {
"account": account,
"user": username,
"password": password,
"role": "accountadmin",
"warehouse": "ml_wh",
"database": "dev_db",
"schema": "public",
}

# Create Snowpark session
session = Session.builder.configs(connection_parameters).create()

2. Read Data

Let’s read an excel file containing the NASA Bearing Sensor prepared data and write it to a table in Snowflake.

folder_path = '../../data/NASA_Dataset/'
for file in os.listdir(folder_path):
if file.endswith('.csv'): # Check if the file is a CSV file
csv_file = file
break

if csv_file is not None:
# Read the CSV file into a pandas dataframe
snow_pd = pd.read_csv(os.path.join(folder_path, csv_file))
#snow_pd.set_index('MEASURE_TS', inplace=True)
print(snow_pd)
else:
print("No CSV file found in the data folder.")

#Persist the data into a table in Snowflake
sp_session.write_pandas(snow_pd,'SENSOR_PREPARED',auto_create_table=True, overwrite=True)

3. Preprocessing

Transform the data file index to date time and sort in chronological order taking advantage of the capability to switch between pandas and snowpark constructs.

import datetime
# Define the format strings
iso_format = '%Y-%m-%dT%H:%M:%S%z'
new_format = '%Y.%m.%d.%H.%M.%S'

# Convert the time_data column to the desired format
snow_pd['MEASURE_TS'] = snow_pd['MEASURE_TS'].apply(lambda x: datetime.datetime.strptime(x, iso_format).strftime(new_format))
snow_pd.set_index('MEASURE_TS', inplace=True)
snow_pd.index = pd.to_datetime(snow_pd.index, format='%Y.%m.%d.%H.%M.%S')
snow_pd = snow_pd.sort_index()
snow_pd.index.name ='MEASURE_TS'
snow_pd = snow_pd.round(6)
snow_pd = snow_pd.dropna(axis=1)
snow_pd.head()

4. Data Exploration

An initial exploratory data analysis of the Bearings is seen below.

Fig 2. Basic Data Exploration

5. Split Data into Training and Testing

To define the training and testing data, we split the dataset into two parts. The first part, which represents normal operating conditions, is used for training. The remaining parts of the dataset leading up to the bearing failure are used for testing. We will save our training and test datasets to a Snowflake table.

train = snow_pd[: '2023-02-15 10:52:39']
test = snow_pd['2023-02-15 12:52:39':]
snow_pd.shape

#Write the train and test data to a Snowflake table
sp_session.write_pandas(train,"AE_TRAIN",auto_create_table=True, overwrite=True)
sp_session.write_pandas(test,"AE_TEST",auto_create_table=True, overwrite=True)

6. Model Training

Here, we see a typical data science workflow. We are finished preparing our data and now move on to training in a Python stored procedure on Snowflake. The model created from this stored procedure will be our tool for automating decisions around sensors to maximize the revenue. We’ll surface the mean absolute errors (model inference) on future data using a Python user-defined function to detect outliers.

Snowflake Stored Procedures work well for training because they can read data, hold an entire table in memory to find patterns, and write files (e.g. model files) back to the Snowflake database.

Snowflake User-Defined Functions work well for inference because they return a single value for each row passed to the user-defined function. Because of this, they can easily be distributed to provide fast results.

End Outcome: Effortless, scalable, and secure processing without data movement across compute environments.

Fig 3 End to End ML Pipeline with Snowpark

We will create a stored Procedure to do Model Training using Snowpark and a Keras LSTM Auto-encoder. I have readings from 4 sensors and we will be calculating the loss for each sensor. We will also register the stored procedure. These will create a model file that will get uploaded to an internal stage called MODEL_STG.

from sklearn.metrics import mean_squared_error
import snowflake.snowpark
from snowflake.snowpark import functions as F
from snowflake.snowpark.session import Session
from snowflake.snowpark import version as v
import snowflake.snowpark.types as T
from snowflake.snowpark.window import Window

#from sklearn.model_selection import train_test_split
import os
# Snowpark for Python
from snowflake.snowpark.session import Session
from snowflake.snowpark.types import IntegerType, StringType, StructType, FloatType, StructField, DateType, Variant
from snowflake.snowpark.functions import udf, sum, col,array_construct,month,year,call_udf,lit
from snowflake.snowpark.version import VERSION
import random
random.seed(392)

def sproc_train_lstm_model(session: Session) -> Variant:
X_train = session.table("AE_TRAIN")
X_train = X_train.to_pandas()
X_train = X_train.to_numpy()
X_train = X_train.reshape(X_train.shape[0], 1, X_train.shape[1])

#X_test = X_test.reshape(X_test.shape[0], 1, X_test.shape[1])


inputs = Input(shape=(X_train.shape[1], X_train.shape[2]))
#model with five LSTM layers
L1 = LSTM(16, activation='relu', return_sequences=True,
kernel_regularizer=regularizers.l2(0.00))(inputs)
L2 = LSTM(4, activation='relu', return_sequences=False)(L1)
L3 = RepeatVector(X_train.shape[1])(L2)
L4 = LSTM(4, activation='relu', return_sequences=True)(L3)
L5 = LSTM(16, activation='relu', return_sequences=True)(L4)
output = TimeDistributed(Dense(X_train.shape[2]))(L5)
model = Model(inputs=inputs, outputs=output)

optimizer=keras.optimizers.Adam(lr=.001)
model.compile(optimizer=optimizer, loss='mae')

# fit the model to the data
nb_epochs = 100
batch_size = 10
history = model.fit(X_train, X_train, epochs=nb_epochs, batch_size=batch_size,
validation_split=0.05).history

from joblib import dump
dump(model, '/tmp/'+ 'lstm_model.h5')
session.file.put('/tmp/'+ 'lstm_model.h5', '@MODEL_STG', auto_compress=False, overwrite=True)
return history
#Registering the function as a named Stored Procedure 
sp_session.use_warehouse('SNOWPARK_OPT_WH')
sproc_train_lstm_model = sp_session.sproc.register(func=sproc_train_lstm_model,
name='sproc_train_lstm_model',
is_permanent=True,
replace=True,
stage_location='@MODEL_STG',
packages=["snowflake-snowpark-python","tensorflow","scikit-learn","joblib","keras"]

Let us invoke the Stored Procedure to carry out Model Training and capture the loss

Fig 4 Model Loss
history = sproc_train_lstm_model(session=sp_session)
history = pd.DataFrame(eval(history))
history.head()
 loss val_loss
0 0.042043 0.017814
1 0.005972 0.000654
2 0.000983 0.000663
3 0.000901 0.000895
4 0.000905 0.001245

Distribution of Loss Function

To find an anomaly, we will plot the distribution of the calculated loss in the training set and determine the threshold value. By setting the threshold above the “noise level,” any anomalies flagged should be statistically significant above the background noise.

Fig 5 Loss Distribution

We can also see that the model is saved in the stage.

# The model is now stored in a Snowflake stage
pd.DataFrame(sp_session.sql('LIST @MODEL_STG').collect())

Create Vectorized User-Defined Function (UDF) using Batch API for inference

Here we will leverage the Python UDF Batch API to create a vectorized UDF that takes a Pandas Data frame as input.

def predict_anomaly(df: pd.DataFrame) -> pd.Series:


import os
import sys
from joblib import load
#tf.logging.set_verbosity(tf.logging.ERROR)


from keras.layers import Input, Dropout, Dense, LSTM, TimeDistributed, RepeatVector
from keras.models import Model
from keras.models import load_model
from keras import regularizers
# file-dependencies of UDFs are available in snowflake_import_directory
scaler = MinMaxScaler()
X_train_pd = scaler.fit_transform(df)



X = X_train_pd.reshape(X_train_pd.shape[0], 1, X_train_pd.shape[1])
#X = df.values.reshape(df.count(), 1, 4)
IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
model_name = 'lstm_model.h5'
model = load_model(import_dir+model_name)
#scored_data = pd.Series(model.predict(X))
#column_0 = df.iloc[:, 0]
X_pred = model.predict(X)
X_pred = X_pred.reshape(X_pred.shape[0], X_pred.shape[2])
scored = np.abs(X_pred-X_train_pd)
return pd.Series(scored.tolist())
predict_anomaly = sp_session.udf.register(func=predict_anomaly, 
name="predict_anomaly",
stage_location='@LIB_STG',
input_types=[T.FloatType()]*len(Input_cols),
return_type = T.ArrayType(),
replace=True,
is_permanent=True,
imports=['@MODEL_STG/lstm_model.h5'],
packages=['scikit-learn==1.1.1','pandas','joblib', 'cachetools','keras','tensorflow'],
session=sp_session)

Based on the results of the model training, we now have an array of Loss Mean Absolute Errors across all individual sensors. To analyze these results further, we need to extract the values separately. This can be done using a function that splits the values from the array.

By parsing the array, we can obtain the reconstruction error for each sensor and use the maximum value as the threshold for that sensor. This threshold will be used to identify any anomalies in the test dataset.

To calculate the loss on the test set, we will use the vectorized UDF that was created earlier. This UDF will take a Pandas DataFrame as input and return a Pandas Series as output. The Pandas Series will contain the Loss Mean Absolute Errors for each sensor.

# define a function to extract values from the array
def get_value(string, idx):
return float(string.split(',')[idx].strip()[1:-1])

# Get reconstruction loss threshold for each sensor.

Threshold_TEMPERATURE= np.max(snow_X_pred_temp['TEMPERATURE_LOSS_MAE'])
print("Reconstruction error threshold: ", Threshold_TEMPERATURE)

# calculate the loss on the test set
X_test_snowdf = sp_session.table("AE_TEST")
X_pred_test = X_test_snowdf.with_column('Loss_mae', predict_anomaly([F.col(c) for c in X_test_snowdf.columns])).to_pandas()
X_pred_test.index = test.index
X_pred_test = X_pred_test.drop(['TEMPERATURE','VIBRATION','MOTOR_AMPS','MOTOR_RPM'],axis=1)

Detect Anomalies

Once we have the Loss Mean Absolute Errors for each sensor, we can compare them to the thresholds that were set earlier.

#Determine anomaly in each sensor
snow_X_pred_test['Anomaly_in_vibration_sensor'] = snow_X_pred_test['VIBRATION_LOSS_MAE'] > snow_X_pred_test['Threshold_VIBRATION']
snow_X_pred_test['Anomaly_in_temperature_sensor'] = snow_X_pred_test['TEMPERATURE_LOSS_MAE'] > snow_X_pred_test['Threshold_TEMPERATURE']
snow_X_pred_test['Anomaly_in_motor_amps_sensor'] = snow_X_pred_test['MOTOR_AMPS_LOSS_MAE'] > snow_X_pred_test['Threshold_MOTOR_AMPS']
snow_X_pred_test['Anomaly_in_motor_rpm_sensor'] = snow_X_pred_test['MOTOR_RPM_LOSS_MAE'] > snow_X_pred_test['Threshold_MOTOR_RPM']

If the Loss Mean Absolute Error for a particular sensor exceeds the threshold, then we can conclude that an anomaly has occurred in that sensor. These are marked with a red vertical line as you can observe in the below picture.

Fig 6 : Anomalies in time-series data

Overall, this process allows us to detect potential bearing malfunctions well ahead of the actual physical breakdown, enabling us to take preventative measures and avoid costly downtime.

Voila, we have developed a straightforward method to identify and mark any abnormal readings for monitoring purposes.

Summary

Monitoring the condition of the factory floor assets can help organizations get a better view of the device fleet so that one can make the most of the assets and extend their lifespan. By having access to real-time data on the condition of these assets, organizations can make informed decisions on when to repair, replace or retire assets. This can lead to significant cost savings, as it eliminates the need for unnecessary repairs and replacements.

In this post, we demonstrated the business value of monitoring the condition of factory floor assets to help organizations detect anomalies in sensor data on an IoT device using a Machine Learning model trained in Snowflake.

In Part -2 of this blog series, we will see how to build an Anomaly Detection Solution using Snowpark Cortex ML all within the security boundaries of the Snowflake Data Cloud.

--

--