What do Generative AI and Anomaly Detection have in common?

A Gentle Introduction to Variational Autoencoders and how to use them for Time Series Anomaly Detection purposes.

Giulio Nenna
Data Reply IT | DataTech
16 min readJul 25, 2024

--

This article explores Time Series Anomaly Detection using Variational Autoencoders. We begin with an introduction to the Autoencoder concept and its more advanced variant, the Variational Autoencoder, a versatile tool applicable to various tasks, including generation and anomaly detection. Following this, we delve into the core principles of time series anomaly detection and demonstrate how variational autoencoders can be effectively utilized for this purpose.

What is an Autoencoder

A visual representation of an Autoencoder

An autoencoder, in the simplest terms, consists of two main functions: the encoder and the decoder. The encoder e(x) is a function that takes the original data x and returns a representation of x (that we call z) in a latent space that is dimensionally smaller than the original space. On the other hand, the decoder d(x) takes the latent representation z and tries reconstructing the original data x. In an autoencoder, the encoder and decoder are sequentially arranged, allowing the input data to be first dimensionally reduced and subsequently reconstructed. The best possible autoencoder is the Encoder-Decoder pair that minimizes the reconstruction loss, meaning the difference between input data and reconstructed output data.

The simplest training objective of an autoencoder

In classical machine learning, Principal Component Analysis (PCA) is, for all intents and purposes, an autoencoder since its main objective is to find a suitable subspace SV such that the projection onto S from V minimizes the loss of information. Essentially, PCA compresses high-dimensional data into a lower-dimensional form while preserving as much variability as possible. This process of dimensionality reduction closely aligns with the goals of autoencoders in modern machine learning paradigms.

In a deep learning setting, the simplest form of an autoencoder is a stack of two fully connected layers such that the dimension of the hidden layer is smaller than the starting dimension. This hidden layer effectively serves as an embedding layer, which generates a compact representation of the input data, capturing its most essential features.

The product of this embedding phase is a vector, often referred to as an embedding vector, that serves as an array of meaningful features distilled from the original high-dimensional input. This vector can be leveraged for various tasks beyond mere data reconstruction, which is the primary goal of autoencoders. For instance, this array of features can act as an input to other machine learning models, improving their performance by providing a more manageable and informative representation of the data.

Furthermore, these embedding vectors can be used in clustering, visualization, or as inputs to downstream machine learning tasks such as classification, regression, or anomaly detection. The versatility and efficiency of these embedded arrays make them powerful tools in the machine learning toolkit, enabling more sophisticated and effective models that can handle complex datasets with ease.

A spicier autoencoder: the Variational Autoencoder

A simple autoencoder could easily be trained to minimize the reconstruction loss, such autoencoder would be very effective if the only purpose of our training is to reduce the dimensionality of the data while retaining most of the information without any loss during the encoding-decoding pipeline. However this approach could easily lead to overfitting and, most importantly, to severe irregularity of the latent space preventing us from using it as a sample space for generative purposes.

By overfitting in an autoencoder we mean the phenomenon where the latent variable becomes a meaningless mapping instead of a rich representation. For example, an autoencoder composed of a stack of two fully connected layers that has a latent space dimension of 1, can be easily trained to map each input datapoint to an integer value.

An overfitting example: each datapoint is mapped into a 1D value

An autoencoder of this nature would be minimally useful, as the primary goal is to discover a rich and significant representation of the latent space, which can also be used to sample data for generative tasks. This overfitting phenomenon can easily happen if we don’t allow for regularization of the training function.

What would be preferable instead is a regularized latent space that is, to some extent, even interpretable.

For example imagine we are trying to fit an autoencoder on images of faces. It would be of great use if each face is encoded in two interpretable parameters (smile value and pose value) instead of two meaningless parameters. This is useful since once the autoencoder is trained we obtain both an interpretable encoding and a decoder that, given two interpretable parameters, is able to generate new data according to the given specifications.

This is solved in the variational autoencoder approach by encoding each input not as point in a latent space but instead as a probability distribution (as is in its parameters) from which we could then sample from and obtain a latent sample that can be then fed through the decoder function. In this particular setting the encoder returns the mean and the variance µ, σ of a multivariate normal distribution from which we then sample z ∼ N (µ, σ) that we use as input of the decoding space:

To enforce the above mentioned regularity of the latent space, a regularization term is used. In practice the regularization is done by enforcing distribution to be close to a standard normal distribution. This way, we require the covariance matrices to be close to the identity, preventing punctual distributions, and the mean to be close to 0, preventing encoded distribution to be too far apart from each others.

I will now present a concise mathematical-probabilistic framework to justify this approach. It is quite theoretical and may be readily skipped if preferred.

The main hypothesis is that input data are generated from a latent process z that we assume to be distributed from an apriori distribution p(z) (For example a standard normal distribution). We hence assume to have the following generative process:

Bayesian representation of the hypothesis under the Variational Autoencoder

The main goal is to infer p(z|x) namely, the distribution of z (our latent variable) knowing the data x that would be generated from it.

We can imagine a parallelism between this generative framework and the autoencoder framework: p(z|x) is the distribution of the latent variable given the input data and can be thought as an encoder while p(x|z) is the distribution of the data given the corresponding latent variable and can be thought as a decoder.

In practical terms, we implement an approximation of both p(z|x) and p(x|z) as two fully connected layers. The first takes data as input and generate as output the parameters of the latent distribution (a standard normal distribution) from which the latent variable is then sampled and fed to the second fully connected layer that decodes the latent variable.

Without delving too much into the mathematical assumptions under this model, the objective is to maximize the log-likelihood of our data, that can be written in the following terms:

the above equation states that maximizing the log-likelihood of the data is equivalent to maximizing a lower bound that is composed of the blue and the red term in the equation. Maximizing the blue term is equivalent to maximizing the likelihood of the data with respect to the latent variable while minimizing the red term means minimizing the Kullback-Leibler divergence between q (our encoding approximation) and p(z) (the a-priori distribution of the latent variable). This D-KL minimization term is exactly the regularization term that avoids overfitting by forcing the encoder distribution to be “similar to” a pre-defined distribution that, in our case, will be a standard normal distribution.

The main takeaway of this mathematical excursus is that, by formulating the loss as the above shown equation (which, in this case, is a log-likelihood so it needs to be maximized) the model can be regularized to avoid overfitting in the latent space so that the latent representation of data (embeddings) can be used for generative purposes and, to some extent, even interpreted.

How are Autoencoders used for Anomaly Detection?

Before delving into how exactly the variational autoencoder architecture is used for time series anomaly detection, we will introduce a general methodology for this task that can be adapted and used with Variational Autoencoders.

Model-Based Point-Outlier Anomaly Detection

A point outlier is a datum that behaves unusually in a specific time instant when compared either to the other values in the time series (global outlier) or to its neighboring points (local outlier). Moreover point outliers can be univariate or multivariate depending on whether they affect one or more time-dependent variables respectively. The most intuitive definition for point outlier is a point in time that significantly deviates from its expected value.

When a univariate time series is under scrutiny, then each point can be defined as an outlier if the difference from its expected value exceeds a given threshold τ.

Rule for determining whether a point x is an outlier

This concept may seem pretty simple, but it covers the difficult task of estimating observations from time series data which constitutes its whole research area. Note that we used the term estimating and not predicting since a point can be estimated using only previous observations or neighbor observations including future observations. This distinction is crucial since methods that only rely on past data to estimate new observation can be defined as online algorithms meaning that they can be deployed in real-time application where future data is not available. On the other hand, methods that leverage future data cannot be deployed in real-time for obvious reasons, unless a certain delay is allowed.

Outlier detection that make use of point estimation are based on a model that is adequately fitted to the data in order to infer the behavior of the stochastic process underlying the data. For this reason these approaches are also called model-based and they differentiate one from another based on the model used to fit the data and the methodologies used for estimation. One example of model that can be used to estimate data is the ARIMA model from which one can estimate future data and compute its variance to evaluate whether an observation is anomalous or not.

When dealing with multivariate time series data for point outlier detection various techniques can be adopted to overcome the dimensionality of the data. First of all, multivariate data can be treated as multiple univariate time series hence univariate techniques can be adopted in an ensemble fashion to perform anomaly detection. This approach unfortunately disregards any dependencies that may exist between the variables. Another approach to leverage well established univariate point outlier detection algorithms with multivariate data is to use techniques to aggregate multiple features into independent time series. This means to apply a preprocessing method to the multivariate time series to find a new set of uncorrelated variables where univariate techniques can be applied. This falls under the umbrella of dimensionality reduction.

In contrast to this, multivariate data can be analyzed using natively multivariate techniques. This means that a model is fitted on multivariate time series data and is used to infer estimations on observations that will be leveraged to classify whether if an observation is an anomaly or not. Once an estimation has been computed, a simple anomaly measure based on distance can be used as a decision rule:

Variational Autoencoders for Model-Based AD

A variational autoencoder, in its complete form (encoder and decoder in cascade), is a model that takes as input data and outputs the same data after it has been encoded and then reconstructed. Variational Autoencoders can accept inputs of any dimension so multivariate time series can be easily fed to them to output a multivariate time series with the same dimension. Let’s assume that the autoencoder in our hands accepts time series of length T with K dimensions (and outputs a time series with the same dimensions), then the Anomaly detection framework can be summarized in the following steps:

  1. Assuming to have a time series dataset of length T’>>T that can be split into a part that do not contain anomalous behavior (training data) and a part that contain anomalous timestamps (test data), training data is split into sliding windows of length T that will constitute the training dataset of our variational autoencoder.
  2. Each TS window from the training dataset is fed to the autoencoder to train it. Training an autoencoder is an unsupervised process since loss is based on the difference between the input data and output data.
  3. In the evaluation stage, test data is fed to the autoencoder (in a sliding window fashion) and reconstruction error is computed for each timestamp. The reconstruction error in its simplest form is the euclidean distance between input data and output data and can be treated as an “anomaly score” of each timestamp of the test data
  4. A threshold on anomaly score is set using various techniques such as SPOT algorithm (out of the scope of this article) and all observations above a certain threshold are considered anomalous.
  5. If labeled data is available (a binary label for each test timestamps that indicates whether any observation is anomalous or not) then the model can be evaluated with classical machine learning metrics used in classification (F1 Score) or, if we want to decouple the problem of finding the threshold from the model evaluation, using the AUC-ROC score.

Since the variational autoencoder has been trained on non anomalous data the key concept is that, when fed with anomalous data, the reconstruction error will be much greater then in the training phase.

Python Implementation of a VAE

Now onto the fun part, the code implementation. For this demo we will need to install pytorch-lightning, a layer on top of pytorch that handles tedious tasks such as the training loop.

!pip install pytorch-lightning

import os, sys
from argparse import Namespace
import numpy as np
import pandas as pd
from pathlib import Path
from collections import OrderedDict
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import sklearn
from sklearn import preprocessing
import matplotlib
import matplotlib.pyplot as plt
import pytorch_lightning as pl
from pytorch_lightning.loggers import WandbLogger
import wandb

We first define a Dataset class that takes data from either the train set or the test set. It is also able to filter out some of the columns.

Note: For VAE training, we want to use the column value as a feature rather than a label, because we are doing unsupervised learning. In that case, we set the argument lbl_as_feat to True. If we were training a supervised model, then we would be able to use the same Dataset class but we would need to set lbl_as_feat to False, and the dataset would return value as a label.

class TSDataset(Dataset):
def __init__(self, split, cont_vars=None, cat_vars=None, lbl_as_feat=True):
"""
split: 'train' if we want to get data from the training examples, 'test' for
test examples, or 'both' to merge the training and test sets and return samples
from either.
cont_vars: List of continuous variables to return as features. If None, returns
all continuous variables available.
cat_vars: Same as above, but for categorical variables.
lbl_as_feat: Set to True when training a VAE -- the labels (temperature values)
will be included as another dimension of the data. Set to False when training
a model to predict temperatures.
"""
super().__init__()
assert split in ['train', 'test', 'both']
self.lbl_as_feat = lbl_as_feat
if split == 'train':
self.df = pd.read_csv(datasets_root/'train.csv')
elif split == 'test':
self.df = pd.read_csv(datasets_root/'test.csv')
else:
df1 = pd.read_csv(datasets_root/'train.csv')
df2 = pd.read_csv(datasets_root/'test.csv')
self.df = pd.concat((df1, df2), ignore_index=True)

# Select continuous variables to use
if cont_vars:
self.cont_vars = cont_vars
# If we want to use 'value' as a feature, ensure it is returned
if self.lbl_as_feat:
try:
assert 'value' in self.cont_vars
except AssertionError:
self.cont_vars.insert(0, 'value')
# If not, ensure it not returned as a feature
else:
try:
assert 'value' not in self.cont_vars
except AssertionError:
self.cont_vars.remove('value')

else: # if no list provided, use all available
self.cont_vars = ['value', 'hour_min', 'gap_holiday', 't']

# Select categorical variables to use
if cat_vars:
self.cat_vars = cat_vars
else: # if no list provided, use all available
self.cat_vars = ['day', 'month', 'day_of_week', 'holiday']

# Finally, make two Numpy arrays for continuous and categorical
# variables, respectively:
if self.lbl_as_feat:
self.cont = self.df[self.cont_vars].copy().to_numpy(dtype=np.float32)
else:
self.cont = self.df[self.cont_vars].copy().to_numpy(dtype=np.float32)
self.lbl = self.df['value'].copy().to_numpy(dtype=np.float32)
self.cat = self.df[self.cat_vars].copy().to_numpy(dtype=np.int64)

def __getitem__(self, idx):
if self.lbl_as_feat: # for VAE training
return torch.tensor(self.cont[idx]), torch.tensor(self.cat[idx])
else: # for supervised prediction
return torch.tensor(self.cont[idx]), torch.tensor(self.cat[idx]), torch.tensor(self.lbl[idx])

def __len__(self):
return self.df.shape[0]

Now we move on to the model itself. First, define individual modules. The core of our neural network will be a sequence of fully connected layers, whose number and sizes are passed through the hyperparameter layer_dims (a list of integers). In our experiment, we used 64,128,256,128,64, i.e. 5 layers of dimensions 64, 128, 256, 128 and 64. Each layer can be batch-normalised. The input into the first layer is an aggregation of the continuous variables and embedding vectors encoding the categorical variables.

class Layer(nn.Module):
'''
A single fully connected layer with optional batch normalisation and activation.
'''
def __init__(self, in_dim, out_dim, bn = True):
super().__init__()
layers = [nn.Linear(in_dim, out_dim)]
if bn: layers.append(nn.BatchNorm1d(out_dim))
layers.append(nn.LeakyReLU(0.1, inplace=True))
self.block = nn.Sequential(*layers)

def forward(self, x):
return self.block(x)


class Encoder(nn.Module):
'''
The encoder part of our VAE. Takes a data sample and returns the mean and the log-variance of the
latent vector's distribution.
'''
def __init__(self, **hparams):
super().__init__()
self.hparams = Namespace(**hparams)
self.embeds = nn.ModuleList([
nn.Embedding(n_cats, emb_size) for (n_cats, emb_size) in self.hparams.embedding_sizes
])
# The input to the first layer is the concatenation of all embedding vectors and continuous
# values
in_dim = sum(emb.embedding_dim for emb in self.embeds) + len(self.hparams.cont_vars)
layer_dims = [in_dim] + [int(s) for s in self.hparams.layer_sizes.split(',')]
bn = self.hparams.batch_norm
self.layers = nn.Sequential(
*[Layer(layer_dims[i], layer_dims[i + 1], bn) for i in range(len(layer_dims) - 1)],
)
self.mu = nn.Linear(layer_dims[-1], self.hparams.latent_dim)
self.logvar = nn.Linear(layer_dims[-1], self.hparams.latent_dim)

def forward(self, x_cont, x_cat):
x_embed = [e(x_cat[:, i]) for i, e in enumerate(self.embeds)]
x_embed = torch.cat(x_embed, dim=1)
x = torch.cat((x_embed, x_cont), dim=1)
h = self.layers(x)
mu_ = self.mu(h)
logvar_ = self.logvar(h)
return mu_, logvar_, x # we return the concatenated input vector for use in loss fn

class Decoder(nn.Module):
'''
The decoder part of our VAE. Takes a latent vector (sampled from the distribution learned by the
encoder) and converts it back to a reconstructed data sample.
'''
def __init__(self, **hparams):
super().__init__()
self.hparams = Namespace(**hparams)
hidden_dims = [self.hparams.latent_dim] + [int(s) for s in reversed(self.hparams.layer_sizes.split(','))]
out_dim = sum(emb_size for _, emb_size in self.hparams.embedding_sizes) + len(self.hparams.cont_vars)
bn = self.hparams.batch_norm
self.layers = nn.Sequential(
*[Layer(hidden_dims[i], hidden_dims[i + 1], bn) for i in range(len(hidden_dims) - 1)],
)
self.reconstructed = nn.Linear(hidden_dims[-1], out_dim)

def forward(self, z):
h = self.layers(z)
recon = self.reconstructed(h)
return recon

Finally, the full VAE (Variational Auto Encoder) module:

class VAE(pl.LightningModule):
def __init__(self, **hparams):
super().__init__()
self.save_hyperparameters()
self.encoder = Encoder(**hparams)
self.decoder = Decoder(**hparams)

def reparameterize(self, mu, logvar):
'''
The reparameterisation trick allows us to backpropagate through the encoder.
'''
if self.training:
std = torch.exp(0.5 * logvar)
eps = torch.randn_like(std) * self.hparams.stdev
return eps * std + mu
else:
return mu

def forward(self, batch):
x_cont, x_cat = batch
assert x_cat.dtype == torch.int64
mu, logvar, x = self.encoder(x_cont, x_cat)
z = self.reparameterize(mu, logvar)
recon = self.decoder(z)
return recon, mu, logvar, x

def loss_function(self, obs, recon, mu, logvar):
recon_loss = F.smooth_l1_loss(recon, obs, reduction='mean')
kld = -0.5 * torch.mean(1 + logvar - mu ** 2 - logvar.exp())
return recon_loss, kld

def training_step(self, batch, batch_idx):
recon, mu, logvar, x = self.forward(batch)
# The loss function compares the concatenated input vector including
# embeddings to the reconstructed vector
recon_loss, kld = self.loss_function(x, recon, mu, logvar)
loss = recon_loss + self.hparams.kld_beta * kld

self.log('total_loss', loss.mean(dim=0), on_step=True, prog_bar=True,
logger=True)
self.log('recon_loss', recon_loss.mean(dim=0), on_step=True, prog_bar=True,
logger=True)
self.log('kld', kld.mean(dim=0), on_step=True, prog_bar=True, logger=True)
return loss

def test_step(self, batch, batch_idx):
recon, mu, logvar, x = self.forward(batch)
recon_loss, kld = self.loss_function(x, recon, mu, logvar)
loss = recon_loss + self.hparams.kld_beta * kld
self.log('test_loss', loss)
return loss

def configure_optimizers(self):
opt = torch.optim.AdamW(self.parameters(), lr=self.hparams.lr,
weight_decay=self.hparams.weight_decay,
eps=1e-4)
sch = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(
opt, T_0=25, T_mult=1, eta_min=1e-9, last_epoch=-1)
return [opt], [sch]

def train_dataloader(self):
dataset = TSDataset('train', cont_vars=self.hparams.cont_vars,
cat_vars = self.hparams.cat_vars, lbl_as_feat=True
)
return DataLoader(dataset, batch_size=self.hparams.batch_size, num_workers=2,
pin_memory=True, persistent_workers=True, shuffle=True
)

def test_dataloader(self):
dataset = TSDataset('test', cont_vars=self.hparams.cont_vars,
cat_vars=self.hparams.cat_vars, lbl_as_feat=True
)
return DataLoader(dataset, batch_size=self.hparams.batch_size, num_workers=2,
pin_memory=True, persistent_workers=True
)

At this point we have everything needed for training our model and start doing some anomaly detection. This method is suitable for both univariate and multivariate time series, since the architecture is designed to work with 3D tensors (where the third dimension is the batch size).

The following code will show a sample usage of the classes defined before, assuming a dataset has been loaded in the correct folder with the correct format:

cont_features = [] #continous dataset features
cat_features = [] # same for categorical features

embed_cats = [len(tr_data_scaled[c].unique()) for c in cat_features]

hparams = OrderedDict(
run='embsz16_latsz16_bsz128_lay64-128-256-128-64_ep100_cosineWR_v1',
cont_vars = cont_features,
cat_vars = cat_features,
embedding_sizes = [(embed_cats[i], 16) for i in range(len(embed_cats))],
latent_dim = 16,
layer_sizes = '64,128,256,128,64',
batch_norm = True,
stdev = 0.1,
kld_beta = 0.05,
lr = 0.001,
weight_decay = 1e-5,
batch_size = 128,
epochs = 60,
)
model = VAE(**hparams)
logger = WandbLogger(name=hparams['run'], project='VAE_Anomaly', version=hparams['run'],
save_dir='working/checkpoints'
)
ckpt_callback = pl.callbacks.ModelCheckpoint(dirpath='.', filename='vae_weights')
# Replace argument logger by None if you don't have a WandB account (and don't want to create one)
trainer = pl.Trainer(accelerator='gpu', devices=1, strategy='dp', logger=logger,
max_epochs=hparams['epochs'], auto_lr_find=False, benchmark=True,
callbacks=[ckpt_callback], gradient_clip_val=10., enable_model_summary=True,
)



trainer.fit(model)
trainer.test(model)

Once the model has been fitted, it’s time to perform the anomaly detection part by performing a forward pass (encoding and decoding) on the data, compute the loss, and classify anomalies based on the amount of reconstruction loss.

dataset = TSDataset('test', cont_vars=hparams['cont_vars'],
cat_vars=['day_of_week', 'holiday'],
lbl_as_feat=True)

trained_model = VAE.load_from_checkpoint('./vae_weights-v1.ckpt')
trained_model.freeze()

losses = []

# run predictions for the training set examples
for i in range(len(dataset)):
x_cont, x_cat = dataset[i]
x_cont.unsqueeze_(0)
x_cat.unsqueeze_(0)
recon, mu, logvar, x = trained_model.forward((x_cont, x_cat))
recon_loss, kld = trained_model.loss_function(x, recon, mu, logvar)
losses.append(recon_loss + trained_model.hparams.kld_beta * kld)

data_with_losses = dataset.df
data_with_losses['loss'] = np.asarray(losses)

Done! Now the dataframe data_with_losses will contain the losses computed through the VAE model. Now you can classify anomalies based on quantiles, spectral analysis or any outlier detection algorithm.

Conclusion

In conclusion, variational autoencoders (VAEs) represent a significant advancement in the field of unsupervised learning, offering a probabilistic approach that enhances the capabilities of traditional autoencoders. By incorporating a probabilistic framework, VAEs enable the generation of new data samples and provide a robust mechanism for anomaly detection. This probabilistic nature allows VAEs to learn complex data distributions, making them highly versatile for various applications, including image generation, text analysis, and anomaly detection.

The implementation of a VAE in Python, as demonstrated, showcases the practical aspects of this powerful model. By leveraging libraries such as TensorFlow or PyTorch, one can efficiently build and train VAEs to tackle real-world problems. The ability of VAEs to detect anomalies by learning the normal data distribution and identifying deviations makes them particularly valuable in domains where detecting unusual patterns is critical, such as fraud detection, network security, and medical diagnosis.

--

--