Image Source

Anomalies detection using River

From a proof of concept to predicting millions of transactions

Matias Aravena Gamboa
Published in
5 min readJul 29, 2021

--

Introduction

From Wikipedia:

In data analysis, anomaly detection (also outlier detection) is the identification of rare items, events or observations which raise suspicions by differing significantly from the majority of the data.

Anomaly detection it’s a common machine learning application nowadays. It can be used for detecting and clean noise in data before training a model, fraud detection, detecting errors on sensors data and a lot of useful application in different industries.

There are a lot of algorithms for anomalies detections such as: isolation forests, one-class SVM, auto encoders, etc. If you have labeled anomalies, you can even train classifiers like LightGBM or XGBoost and use them to predict if some instance is anomalous or not. But in many scenarios labeling the anomalies it’s hard and unsupervised algorithms work pretty well.

At Spike we have used Isolation Forests in past projects and they perform pretty good in batch escenarios, but in this post we will talk about incremental learning and how we deployed an anomalies detector in the context of streaming/online data.

Incremental learning with River 🌊

River is a Python package which is focused on incremental learning. Basically, models can be trained one observation per time and this allows to models to be trained on the fly as long the stream of data goes trough the model. The key idea of incremental learning is that models can be adapted to new data without forgetting what they learned in the past.

This has a lot of advantages from a productive perspective:

  • Models don’t need to be retrained; they are always learning from new data.
  • They can adapt quickly to drift and changes in data.
  • Deployment and maintenance of models are often easy.
  • Developing models using simulated data as it will look at production simplifies the deployment and integration steps.

A feature that I really love 🥰 from River is that models are trained with Python dictionaries where the keys represent the feature names. Once again, this is what should be excepted in a productive system.

One of the most mind blowing 🤯 River’s feature (for me), is that you can calculate statistics (or create features) on the fly, and update them while you are training the model; stats and features are stored internally into the model. This really simplifies the deployment, because in a batch scenario you need to store the data, calculate the statistics, update them and serve them into a feature store, or database when you want to do online inference.

Model development

For developing our model we have transactional data that comes from a Pub/Sub stream, which is ingested into other systems (BigQuery for example). We realized that we can use this stream of data in an incremental learning context for training models and predict some useful stuff in real time.

Transaction messages contains information about retail shopping purchase like: items the client bought, items features, stores features, pricing paid, discounts applied, payment method, etc. From the items data we create aggregated features like how many different departments we have in the transaction, mechanism of discounts applied and others relevant features.

As I said before, something really cool about River is that allows you to create aggregation on the fly and update them in real time. In this example we create a RollingMean of the TotalPaid and PercentageDiscountfeatures using the last WINDOWS_SIZE(this parameter can be tuned) observations per StoreID . As we feed the model with new data, these features are updated and used for prediction.

For our anomaly model we used a Half Space Tress algorithm (you can read the algorithm paper here) which is an online variant of Isolation Forests. Those are unsupervised anomaly detections models, which means that we don’t need a label for the classification of anomaly data, they generate an anomaly score and then you can set a threshold to target something as normal or anomalous.

Using River our model looks like this:

Something interesting about the implementation of River, is that the preprocessing modules are incremental too. This means that we don’t need to know a priori all the posible values of categorical features when we use an OneHotEncoder , or if we want to scale the data using MinMaxScaler (which scales the data between 0 to 1). Once again, this simplifies a lot the deployment.

At production our training process looks pretty much like this:

Basically, we parse the Pub/Sub transaction message using the incredible Pydantic library. We have a create_features function which add and calculate some features, next we transform everything into a Dict . Then we use the model get the anomaly score! 🤖. After generate some logs, stream the prediction into BigQuery and a few more steps we feed the model with the new instance and the model learn from it.

Deployment

For deployment, we just put all our Python scripts into a Docker container and then run everything in a e2-micro (2 cores and 1 GB of RAM) instance in Compute Engine. In Pub/Sub we have a pull subscription so the script is receiving messages all the time. After the prediction is done, we use the BigQuery streaming inserts to feed the predictions into a table. Finally use Data Studio for developing reports and to check the transactions scored as anomalous.

The architecture looks pretty much like this, where data from retail servers are streamed into Pub/Sub.

Solution architecture

Results and final thoughts

Some highlights of our solution and learnings:

  • Model’s inference times are pretty fast. This is due the implementation of HalfSpaceTrees. It took less than 400 ms all the process.
  • With a single e2-micro instance we are able to make predictions for hundred of thousands transactions per day.
  • Less than the 1% of transactions are anomalies, which is expected due anomalies should be that…anomalies.

This started as a proof of concept but ended up very similar to our final solution. This was the first time we were able to use an incremental learning algorithm effectively, and migrating from batch to incremental learning was pretty fun because there are very different approach of how handle the data, extract features, etc.

River is an amazing library. It helped a lot for the implementation and deployment of everything. We are using others algorithms implemented by River for classification problems and it’s been very easy to use and deploy.

At Spike, we are now planning using River and incremental learning in other use cases like recommender engines, classification and regression problems. We think that River is very useful in many application when we know that we will have a streaming input of data in our systems.

--

--

Matias Aravena Gamboa

Machine Learning Engineer and beer enthusiast. Valdivia, Chile