Tracking Cryptocurrencies Exchange Trades with Google Cloud Platform in Real-Time — Part 1

Authors: Ivo Galic, Mike Altarace
Contributors: Daniel De Leo, Morgante Pell, Yonni Chen, Stefan Nastic
TL;DR: GitHub Link

The last year has been like a roller coaster for the cryptocurrency market. At the end of 2017, the value of bitcoin (BTC) almost reached $20,000 USD, only to fall below $4,000 USD a few months later. What if there is a pattern in the high volatility of the cryptocurrencies market? If so, can we learn from it and get an edge on future trends? Is there a way to observe all exchanges in real time and visualize it on a single chart?

In this tutorial we will graph the trades, volume and time delta from trade execution until it reaches our system (an indicator of how close to real time we can get the data).

The goal of the tutorial — realtime multi exchange observer

To achieve that, as a first step, we need to capture as much real-time trading data as possible for analysis. However, the large amount of currency and exchange data requires a scalable system that can ingest and store such volume while keeping latency low. Failing to do so, the system will not stay in sync with the exchanges stream.

In this article, we will use Cloud Dataflow and Cloud Bigtable to satisfy those requirements. Dataflow will provide low latency data streaming ingestion capability while Bigtable will provide low latency storage and time series querying at scale.

Requirements / Solutions

Architectural overview

The “usual” requirement for trading systems is low latency data ingestion. We extend this requirement with near real-time data storage and querying at scale. In the following list we will demonstrate what can be learned by conducting this tutorial:

  1. Ingest real-time trading data with low latency from globally scattered datasources / exchanges. Possibility to adopt data ingest worker pipeline location. Easily add additional trading pairs / exchanges. Solution: Dataflow + Xchange Reactive Websockets Framework
  2. Demonstrate an unbounded streaming source code that is runnable with multiple runners. Solution: Apache BEAM
  3. Strong consistency + linear scalability + super low latency for querying the trading data. Solution: Bigtable
  4. Easy and automated setup with project template for orchestration. Example of dynamic variable insertion from Terraform template into the GCP compute instance. Solution: Terraform
  5. Querying and visualization — Execute time series queries on Bigtable visualize it in on the webpage. Solution: Python Flask + Vis.js + Google BigTable Python Client

Architecture/How it works

The source code is written in Java 8, Python 2.7, JavaScript and we are using Maven, PIP for dependency/build management.

The code can be divided into five main framework units:

  1. Data ingestion — XChange Stream framework (Github link)
    Java library providing a simple and consistent streaming API for interacting with Bitcoin and other cryptocurrency exchanges via WebSocket protocol. XChange library is providing new interfaces for streaming API. User can subscribe for live updates via reactive streams of RxJava library. 
    We use this JAVA 8 framework to connect and configure some exchanges (BitFinex, Poloniex, BitStamp, OkCoin, Gemini, HitBTC, Binance…).
    Link to the exchange / trading pair configuration code
  2. Parallel processing — Apache Beam (Github link)
    Apache Beam is an open source unified programming model to define and execute data processing pipelines, including ETL, batch and stream (continuous) processing. Supported runners: Apache Apex, Apache Flink, Apache Gearpump, Apache Samza, Apache Spark, and Google Cloud Dataflow.
    We demonstrate how to create an unbounded streaming source/reader and manage basic watermarking, checkpointing and record id for data ingestion. 
    Link to the bridge between BEAM and XChange Stream framework
  3. BigTable sink — Cloud Bigtable with Beam using the HBase API. (Github link) Connector and writer to Bigtable.
    We explain here how to create a row key and create a Bigtable mutation function prior to writing to Bigtable.
    Link to the BigTable key creation / mutation function
  4. Realtime API endpoint — Flask web server at port:5000+ BigTable client (GitHub link) will be used to query the Bigtable and serve as API endpoint.
    Link to the BigTable query builder + results retrieval and sampling
  5. JavaScript Visualization — Vis.JS Flask template that will query the real-time API endpoint every 500ms. 
    Link to the HTML template file
Flask web server will be run in the GCP VM instance

Pipeline definition

For every exchange + trading pair, we create a different pipeline instance. Pipeline consists of 3 steps:

  1. UnboundedStreamingSource that contains ‘Unbounded Streaming Source Reader(bitStamp2)
  2. BigTable pre-writing mutation and key definition
    (ETH-USD Mut2)
  3. BigTable write step (ETH-USD2)

Bigtable row key design decisions

Our DTO looks like this:

We formulated the row key structure in the following way: TradingCurrency#Exchange#SystemTimestampEpoch#SystemNanosTime.

E.g: a row key might look like BTC/USD#Bitfinex#1546547940918#63187358085

BTC/USD — Trading Pair
Bitfinex — Exchange
1546547940918 — Epoch timestamp (more info)
63187358085 — System Nano time (more info)

Why do we add nanotime at our key end? 
Our design decision is to avoid multiple versions per row for different trades. Two DoFn mutations might execute in the same Epoch ms time if there is a streaming sequence of TradeLoad DTOs. NanoTime at the end will split Millisecond to an additional one million.

If this is not enough for your needs we recommend hashing the volume / price ratio and attaching the hash at the end of the row key.

Row cells will contain an exact schema replica of the exchange TradeLoad DTO (see earlier in the table above). This choice will help us go from a specific (trading pair) — (exchange) to less specific (timestamp — nanotime) and avoid hotspots when you query the data.

Costs

This tutorial uses billable components of Google Cloud Platform, including: Dataflow, Compute Engine, Google Cloud Storage, BigTable

We recommend to clean up the project after finishing this tutorial to avoid costs. Use the Pricing Calculator to generate a cost estimate based on your projected usage.

Environment setup

Use Terraform instructions if you are familiar with Terraform, it can save you a lot of time. Otherwise, just continue.

We assume you have a Google Cloud Project associated with a billing account already (otherwise check out the getting-started section). Log into the console, and activate a cloud console session

We’ll need a VM to drive the creation of the pipeline so let’s create one with the following command:

Note how we used the compute engine service account with cloud API scope, we need that to easily build up the environment.

Wait for the VM to come up and SSH into it.

Installing necessary tools like java, git, maven, pip, python 2.7 and cloud bigtable command line tool cbt using the following command:

We’ll now enable some APIs, create a Bigtable instance and a bucket:

In this scenario, we use one column family called ‘market’ to simplify the schema design. For more on that you can read this link.

Ready to go, clone the repo

Now we will build the pipeline

You should see this at the end if everything is OK

Now we can finally start the pipeline

Please ignore illegal thread pool exceptions. After a few minutes we can observe the incoming trades by peeking into the Bigtable table.

To observe the Dataflow pipeline navigate to the console Dataflow page. And click the pipeline and view the Job status as Running:

To run the Flask frontend server visualization navigate to frontend directory inside our VM and build the python package

Visualization

Open firewall port 5000 for visualization:

Link VM with the firewall rule:

Navigate to frontend directory

Find your external IP in Compute console and open it in your browser with port 5000 at the end e.g.
http://external-ip:5000/stream

You should be able to see the visualization of aggregated BTC/USD pair on several exchanges (without predictor part)

The goal of the tutorial — realtime ‘periscope’ multi exchange observer

Enjoy!

Cleanup

To save on cost we can clean up the pipeline by running the following command

Empty and Delete the bucket:

Delete the Bigtable instance:

Exit the VM and delete it from the console.

Conclusion

In this article, we have discussed the most important design and technical decisions: i) how to set up and configure pipeline for ingesting real-time, time-series data from various crypto exchanges ii) how to design suitable data model, which facilitates querying and graphing at scale. 
Finally, we have provided a tutorial on how to set up and deploy the proposed architecture using GCP. By following the tutorial steps we managed to establish a connection to multiple exchanges, subscribe to their trade feed, extract and transform these trades into a flexible format to be stored in Bigtable and to be graphed and analyzed.

If our readers show interest (please follow us to do so), we will extend the tutorial with the second part where we will build ML models to predict trends. Do not hesitate to ask questions in the comment section.