Analyzing time series cryptocurrencies data using Apache Spark, Flint and Spark Notebook
This article is based on my spark notebook and highly inspired by @sarahpan great article on analysis of cryptocurrencies data using PostgreSQL. The main goal of this article is not to provide you with Ethereum, Bitcoin and other cryptocurrencies insights (for this one please refer to @sarahpan’s article) but to get you familiar with tools which Apache Spark ecosystem can provide you to perform similar kind of analysis.
In this notebook we perform time series data analysis using Apache Spark, a time series library for Apache Spark called Flint and interactive computations and visualization capabilities of Spark Notebook.
Data
Direct link to download the dataset. Also here one can find detailed description for the dataset.
We are interested in two files:
- btc_prices.csv — A CSV file with the daily BTC price data (OHLCV format), spanning seven years from 2010 to 2017 across multiple currencies (e.g. USD, CAD, EUR, CNY, etc.)
- crypto_prices.csv — A CSV file with the daily cryptocurrency price data (OHLCV format) for over 1200 cryptocurrencies since 2013
This is a small dataset and Spark lanched on laptop in local mode should be enough to work with it.
Requirements
spark 2.0
or higher, 2.11.7 or higher
.
Spark Notebook. Get it via
(for Spark 2.X.X)$ unzip spark-notebook.zip$ cd spark-notebook-0.7.0-scala-2.11.8-spark-2.0.2-hadoop-2.7.2-with-hive/$ ./bin/spark-notebookhttp://localhost:9001
or from the source code:
sbt -Dspark.version=2.1.1 -Dhadoop.version=2.7.0 -Dscala.version=2.11.8[spark-notebook] $ start
Navigate to `cryptocurrency_ts.snb` from Spark Notebook browser window and click to launch it.
We also need to provide custom dependencies on Flint library. It's not published on maven central repository but you can simply build it from the source and publish locally by running
sbt publishLocal
After that provide coordinates for dependencies in customDeps
section of Notebook metadata (Edit -> Edit Notebook Metadata):
"customDeps": [ "com.twosigma % flint_2.11 % 0.2.0-SNAPSHOT" ]
Reading the data
Apache Spark supports direct read from csv
files and can try to automatically infer schema. But we also can provide our own schema while reading csv
data. This will prevent from full scan of csv
file to infer schema and it's also more accurate if we know what we're doing.
btcPricesDF.show(5)
cryptoPricesDF.show(5)
OHLC charts
Since the data is presented in OHLCV format the natural way to plot them would be using OHLC charts. Spark Notebook comes with build in support (CustomPlotlyChart
) for Plotly javascript API for data visualizations. To get some examples on usage of CustomPlotlyChart
refer to notebooks from notebooks/viz
directory which comes with Spark Notebook distribution or explore online with nbviewer.
Resulting Plotly charts are interactive so fill free to zoom and hover over data directly from the Notebook.
TimeSeriesRDD
Now let’s start with some time series analysis. From Flint getting started guide you can find that the entry point into all functionalities for time series analysis in Flint is the TimeSeriesRDD
class or object. And we can create one from an existing DataFrame
, for that we have to make sure the DataFrame
contains a column named "time" of type LongType. And that's why we performed that extra .withColumn("time", unix_timestamp($"datetime", "yyyy-MM-dd HH:mm:ssX"))
steps to create btcPricesDF
and cryptoPricesDF
dataframes.
After creating our TimeSeriesRDD
we can perform various transformations on it. If we want to perform some grouping or aggregation on our time series data then there are several options provided by Flint for that depending on your needs. Let's say we want to split our data into 14 day
time buckets and obtain some summary information per each bucket.
Generating clockTS RDD for time buckets
Time buckets could be defined by another TimeSeriesRDD
. Its timestamps will be used to defined intervals, i.e. two sequential timestamps define an interval.
Now that we defined time intervals we’re able to apply available summarizers for each interval.
So we obtained mean
closing price in USD per each 14 day
interval.
Spark SQL Window Functions
Another useful thing in Apache Spark toolbox is Spark SQL Window Functions. So what it can be useful for? It provides the ability to perform calculations across set of rows like calculating a moving average, calculating a cumulative sum, or accessing the values of a row appearing before the current row.
Let’s take an example of calculating the day-by-day volatility of BTC where we want to calculate BTC daily return as a factor of the previous day’s rate.
For that we can use DataFrame
API or use SQL
expressions by registering temporary view of our dataframe.
Volumes by currency
Now let’s get back to TimeSeriesRDD
and refresh our knowledge on time seriese data summarization with one more example. Let's say we want to track changes in volume of BTC in different fiat currencies in 14 day intervals. For that we can use already seen .summarizeIntervals
method with additional key
argument to group results by currency_code
.
and to take a closer look at CNY
currency
Temporal Join
Now we want to obtain ETH prices in fiat currencies. But in crypto_prices
table we have only BTC prices for all other crypto currencies while prices in fiat currencies we have only for BTC in btc_prices
table.
And this is where JOIN
comes in handy. But in case of time series data it would be a Temporal Join. And again Flint provide several options for that.
Temporal join functions define a matching criteria over time. It could be an exact match or it can look past or look future to find closest row from other table with timesatmp located within some tolerance
interval.
So given BTC prices in fiat currencies for some timestamp in btc_prices
table we want to find closest ETH prices in BTC within 1 day
from crypto_prices
table.
we keep only those records for which matching criteria is met.
Prices in different fiat currencies might be in different scales like for USD
and CNY
, so plotting them on the same chart with single yaxis
might be not a good idea. For that one can plot them on the same chart but with multiple yaxes or use subplots.
Another good example on using temporal joins would be calculating of total transaction volume in USD for top all crypto currencies in the dataset over the past week.
An important thing to note here is that when performing a join temporal join function tries to find one closest row from right table. But in our case several rows from crypto_prices
table corresponding to different currencies share the same timestamp and we want to join BTC prices for all of them. The solution is to group all rows sharing exactly the same timestamp in crypto_prices
table using .groupByCycle
function.
After this we can use explode
function to create a new row for each element in rows
array which contains all the rows from crypto_prices
table sharing exactly the same timestamp.
Now we can perform required aggregation on both crypto_btc_usd_prices
and btc_usd_prices
tables and unite the the results.
Conclusion
I hope this notebook has given you some ideas on how can you use these great tools like Apache Spark, Flint library, Spark Notebook and Plotly scientific graphing library for time series data analysis. Also given that Apache Spark is a fast and general engine for big data processing you can use all these tools on much larger datasets in cluster computing environment.
And again many thanks to @sarahpan for sharing her ideas on analysis of given dataset.