Real-time Crypto Price Anomaly Detection with Deep Learning and Band Protocol

Immediate and accurate analysis of financial time series data is crucial to the price discovery mechanism that is at the heart of capital markets.

We’ll show you how insights can be derived from financial time series data, in real-time, using Machine Learning. In particular, a Keras model implementing an LSTM neural network for anomaly detection is provided.

The data we’re using comes from the Band Protocol public dataset available in Google BigQuery. Band Protocol is a cross-chain data oracle platform that aggregates and connects real-world data and APIs to smart contracts. This sample query gives you the latest and historical prices in USD of 100 currency pairs, including $ETH, $BTC and $BAND.

Analysis at a glance

The whole process can be broken down into two parts:

Part 1: Metrics calculation using Dataflow. The following metrics and technical indicators are included: Relative Strength Index (RSI), Moving Average (MA), and Open, High, Low, Close (OHLC).

Part 2: Anomaly detection. We are using an LSTM model implemented in Keras.

Now, let’s go through the details.

Part 1: Metrics Calculation

Start with checking out the source code repository:

git clone https://github.com/blockchain-etl/band-dataflow-sample-applications.git
cd band-dataflow-sample-applications
cd timeseries-streaming/timeseries-java-applications

Bootstrap the metrics from historical data

The BandDataBootstrapGenerator pipeline allows you to calculate metrics for historical oracle requests. The data are sourced from the BigQuery table public-data-finance.crypto_band.oracle_requests.

Initialize variables:

PROJECT=$(gcloud config get-value project 2> /dev/null)    
# Make sure to update the date to yesterday's date
TIMESTAMP_THRESHOLD="2020-10-01T07:00:00.0Z"
DATASET_NAME=crypto_band
TABLE_NAME=metrics
TEMP_BUCKET=<replace_with_your_temporary_bucket>

Create a BigQuery dataset:

bq --location=US mk --dataset $DATASET_NAME 

Run the pipeline:

./gradlew generate_band_bootstrap_data --args="\
--bigQueryTableForTSAccumOutputLocation=$PROJECT:$DATASET_NAME.$TABLE_NAME \
--timestampThreshold="$TIMESTAMP_THRESHOLD" \
--runner=DataflowRunner \
--tempLocation=gs://$TEMP_BUCKET/temp \
--maxNumWorkers=1 \
--region=us-central1 \
"

The pipeline will be deployed to Dataflow and will take between 30 to 60 minutes depending on the amount of historical data.

The “bootstrap metrics” Dataflow pipeline

The pipeline uses 10-minute aggregation windows for OHLC and 60-minute rolling windows for moving average and RSI calculation by default. You can customize the parameters with --typeOneComputationsLengthInSecs and --typeTwoComputationsLengthInSecs, respectively.

You can observe the results in the public table band-etl.crypto_band.metrics that we set up earlier:

SELECT * 
FROM `band-etl.crypto_band.metrics`
--WHERE timeseries_key = 'ETH'
ORDER BY lower_window_boundary DESC
LIMIT 1000
band-etl.crypto_band.metrics table in BigQuery console

You can query RSI for $ETH for the previous day:

SELECT upper_window_boundary as time, data.dbl_data as RSI 
FROM `band-etl.crypto_band.metrics`
CROSS JOIN UNNEST(data) AS data
WHERE timeseries_key = 'ETH'
AND data.metric = 'RELATIVE_STRENGTH_INDICATOR'
ORDER BY lower_window_boundary DESC
LIMIT 144

And visualize it in Data Studio right from the BigQuery console:

Ethereum Relative Strength Index (RSI)

Calculate metrics from a Pub/Sub stream

The BandDataStreamGenerator pipeline allows you to calculate metrics for oracle requests pulled from a Pub/Sub subscription projects/public-data-finance/topics/crypto_band.oracle_requests. It will output
the result to a specified BigQuery table and a specified Pub/Sub topic.

Create a Pub/Sub subscription for Band Protocol oracle requests:

gcloud pubsub subscriptions create crypto_band.oracle_requests.metrics \
--topic=crypto_band.oracle_requests \
--topic-project=public-data-finance

Initialise the variables:

PROJECT=$(gcloud config get-value project 2> /dev/null)    
# Make sure to update the date to yesterday's date
TIMESTAMP_THRESHOLD="2020-10-01T07:00:00.0Z"
DATASET_NAME=crypto_band
TABLE_NAME=metrics
TEMP_BUCKET=<replace_with_your_temporary_bucket>
TOPIC_NAME=crypto_band.metrics

Start the pipeline in Dataflow:

./gradlew run_band_example --args="\
--pubSubSubscriptionForOracleRequests=projects/$PROJECT/subscriptions/crypto_band.oracle_requests.metrics \
--bigQueryTableForTSAccumOutputLocation=$PROJECT:$DATASET_NAME.$TABLE_NAME \
--timestampThreshold="$TIMESTAMP_THRESHOLD" \
--pubSubTopicForTSAccumOutputLocation=projects/$PROJECT/topics/$TOPIC_NAME \
--runner=DataflowRunner \
--maxNumWorkers=1 \
--workerMachineType=n1-standard-1 \
--diskSizeGb=30 \
--region=us-central1 \
"

The output will be streamed to the specified BigQuery table and the Pub/Sub topic.

Part 2: Anomaly Detection

The examples provided below are intended to explore the data engineering needed to work with Band Protocol data and deliver it to an auto encoder — decoder.

Part 2 can be broken down into three steps:

  1. Generating TF.Example’s for training the model.
  2. Training an LSTM model on training data.
  3. Batch anomaly detection.

Generating TF.Example’s with Band Protocol data

In order to build the model you will need to first run the generator job
BandDataBootstrapGenerator.java which will create training data as TF.Examples:

git clone https://github.com/blockchain-etl/band-dataflow-sample-applications.git
cd dataflow-sample-applications/timeseries-streaming/timeseries-java-applications

Run the Dataflow job (make sure to replace <your_temp_bucket> with your value and update the TIMESTAMP_THRESHOLD variable):

TIMESTAMP_THRESHOLD="2020-10-03T00:00:00.0Z"
BUCKET=<your_temp_bucket>
./gradlew generate_band_bootstrap_data --args="\
--interchangeLocation=gs://$BUCKET/band_bootstrap_tfexamples/run0 \
--timestampThreshold="$TIMESTAMP_THRESHOLD" \
--runner=DataflowRunner \
--tempLocation=gs://$BUCKET/temp \
--maxNumWorkers=1 \
--region=us-central1 \
"

Once the job is done, download the generated files from gs://$BUCKET/band_bootstrap_tfexamples/run0,
change the information in the config.py to match your local env.

Train the model

Setup virtual environment:

virtualenv -p python3.7 streaming-tf-consumer
source streaming-tf-consumer/bin/activate

Install the dependencies:

git clone https://github.com/blockchain-etl/band-dataflow-sample-applications.git
cd dataflow-sample-applications/timeseries-streaming/timeseries-python-applications
cd MLPipeline
pip install -e .
cd ..
ls

Run the command with the virtual-env activated:

python MLPipelineExamples/test_pipelines/timeseries_local_simple_data.py

You should see the model building as below:

....
Epoch 26/30
280/280 [==============================] - 6s 21ms/step - loss: 119.8072 - mean_absolute_error: 6.8178 - val_loss: 684588.3750 - val_mean_absolute_error: 670.2068
Epoch 27/30
280/280 [==============================] - 7s 23ms/step - loss: 119.6002 - mean_absolute_error: 6.8087 - val_loss: 203.5257 - val_mean_absolute_error: 8.6160
Epoch 28/30
280/280 [==============================] - 6s 20ms/step - loss: 119.5842 - mean_absolute_error: 6.8084 - val_loss: 41512.6406 - val_mean_absolute_error: 160.1564
Epoch 29/30
280/280 [==============================] - 6s 20ms/step - loss: 119.5832 - mean_absolute_error: 6.8084 - val_loss: 5213.2568 - val_mean_absolute_error: 58.4286
Epoch 30/30
280/280 [==============================] - 5s 19ms/step - loss: 119.5791 - mean_absolute_error: 6.8083 - val_loss: 24351.4551 - val_mean_absolute_error: 151.2784

This will output a serving_model_dir under the location you specified for PIPELINE_ROOT in the config.py file.
With this you can now follow the rest of the steps outlines in Option 1 but using your own model.

Batch inference

The pipeline defined in batch_inference.py reads the given TF.Example’s and the saved model,
runs inference to get a predicted value, compares this value with the actual value and if the difference is greater
than the threshold reports the value as anomalous.

Run the command with the virtual-env activated, providing values for the location of
the --saved_model_location using the model built on the previous step,
and the location of the generated data you downloaded from GCS bucket with --tfrecord_folder:

python MLPipelineExamples/test_pipelines/batch_inference.py \
--saved_model_location=./build/Trainer/model/5/serving_model_dir \
--tfrecord_folder=/<your-directory>/data/*

You will see messages of detected outliers:

...
Outlier detected for ETH-value-LAST at 2020-08-23 21:20:00 - 2020-08-23 22:20:00 Difference was 279.74764251708984 for value input 388.8900146484375 output 109.14237213134766 with raw data [388.8900146484375]
Outlier detected for ETH-value-FIRST at 2020-08-22 14:20:00 - 2020-08-22 15:20:00 Difference was 310.73509216308594 for value input 392.1044921875 output 81.36940002441406 with raw data [392.1044921875]
Outlier detected for ETH-value-LAST at 2020-08-22 14:20:00 - 2020-08-22 15:20:00 Difference was 308.1451416015625 for value input 392.1044921875 output 83.9593505859375 with raw data [392.1044921875]

Note 1: the intent of this sample is to demonstrate the data engineering effort needed to support data generated from a streaming Beam pipeline and delivered to an LSTM autoencoder-decoder. It is not intended to demonstrate state of the art machine learning approaches to anomaly detection and more work needs to be done to optimize the model for better performance.

Note 2: the LSTM used here is pre-trained and deployed, and not learning online. Extending this sample for online learning in micro-batches is left as an exercise for the reader.

References

This article is based on the work of Reza Rokni. For more examples of time series data processing in Dataflow refer to this repository:

https://github.com/GoogleCloudPlatform/dataflow-sample-applications/tree/master/timeseries-streaming.

--

--

--

A collection of technical articles and blogs published or curated by Google Cloud Developer Advocates. The views expressed are those of the authors and don't necessarily reflect those of Google.

Recommended from Medium

Learn and Play with TensorFlow.js [Part 3: MNIST Classification]

Step by step set-up for Machine Learning with laptop and eGPU

Leverage Pre-Built Machine Learning (ML) Models to Accelerate Your ML Journey

Operationalization of ML Pipelines on Apache Mesos and Hadoop using Airflow

Predicting Molecular Activity Using Deep Learning in TensorFlow

Deep Learning w/ CryptoPunks

Toy Story 4.0 : Federated Machine Learning in Rust, Part I.

Very simple Emotion Recognition

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Evgeny Medvedev

Evgeny Medvedev

Creator of https://github.com/blockchain-etl, Co-founder of https://d5.ai and https://nansen.ai, Google Cloud GDE, AWS Certified Solutions Architect

More from Medium

Exploring the NFT transaction with Neo4j

Trillo Workbench for Similarity Matching on the Google Cloud

A Vertex AI TensorBoard alternative for smaller budgets (Part 1)

Klarity raises $18M Series A