Analyzing time series cryptocurrencies data using Apache Spark, Flint and Spark Notebook

Andrej
5 min readJul 24, 2017

--

This article is based on my spark notebook and highly inspired by @sarahpan great article on analysis of cryptocurrencies data using PostgreSQL. The main goal of this article is not to provide you with Ethereum, Bitcoin and other cryptocurrencies insights (for this one please refer to @sarahpan’s article) but to get you familiar with tools which Apache Spark ecosystem can provide you to perform similar kind of analysis.

In this notebook we perform time series data analysis using Apache Spark, a time series library for Apache Spark called Flint and interactive computations and visualization capabilities of Spark Notebook.

Data

Direct link to download the dataset. Also here one can find detailed description for the dataset.

We are interested in two files:

  • btc_prices.csv — A CSV file with the daily BTC price data (OHLCV format), spanning seven years from 2010 to 2017 across multiple currencies (e.g. USD, CAD, EUR, CNY, etc.)
  • crypto_prices.csv — A CSV file with the daily cryptocurrency price data (OHLCV format) for over 1200 cryptocurrencies since 2013

This is a small dataset and Spark lanched on laptop in local mode should be enough to work with it.

Requirements

spark 2.0 or higher, 2.11.7 or higher.

Spark Notebook. Get it via

(for Spark 2.X.X)$ unzip spark-notebook.zip$ cd spark-notebook-0.7.0-scala-2.11.8-spark-2.0.2-hadoop-2.7.2-with-hive/$ ./bin/spark-notebookhttp://localhost:9001

or from the source code:

sbt -Dspark.version=2.1.1 -Dhadoop.version=2.7.0 -Dscala.version=2.11.8[spark-notebook] $ start

Navigate to `cryptocurrency_ts.snb` from Spark Notebook browser window and click to launch it.

We also need to provide custom dependencies on Flint library. It's not published on maven central repository but you can simply build it from the source and publish locally by running

sbt publishLocal

After that provide coordinates for dependencies in customDeps section of Notebook metadata (Edit -> Edit Notebook Metadata):

"customDeps": [    "com.twosigma % flint_2.11 % 0.2.0-SNAPSHOT"  ]

Reading the data

Apache Spark supports direct read from csv files and can try to automatically infer schema. But we also can provide our own schema while reading csv data. This will prevent from full scan of csv file to infer schema and it's also more accurate if we know what we're doing.

btcPricesDF.show(5)
cryptoPricesDF.show(5)

OHLC charts

Since the data is presented in OHLCV format the natural way to plot them would be using OHLC charts. Spark Notebook comes with build in support (CustomPlotlyChart) for Plotly javascript API for data visualizations. To get some examples on usage of CustomPlotlyChart refer to notebooks from notebooks/viz directory which comes with Spark Notebook distribution or explore online with nbviewer.

Resulting Plotly charts are interactive so fill free to zoom and hover over data directly from the Notebook.

TimeSeriesRDD

Now let’s start with some time series analysis. From Flint getting started guide you can find that the entry point into all functionalities for time series analysis in Flint is the TimeSeriesRDD class or object. And we can create one from an existing DataFrame, for that we have to make sure the DataFrame contains a column named "time" of type LongType. And that's why we performed that extra .withColumn("time", unix_timestamp($"datetime", "yyyy-MM-dd HH:mm:ssX")) steps to create btcPricesDF and cryptoPricesDF dataframes.

After creating our TimeSeriesRDD we can perform various transformations on it. If we want to perform some grouping or aggregation on our time series data then there are several options provided by Flint for that depending on your needs. Let's say we want to split our data into 14 day time buckets and obtain some summary information per each bucket.

Generating clockTS RDD for time buckets

Time buckets could be defined by another TimeSeriesRDD. Its timestamps will be used to defined intervals, i.e. two sequential timestamps define an interval.

Now that we defined time intervals we’re able to apply available summarizers for each interval.

So we obtained mean closing price in USD per each 14 day interval.

Spark SQL Window Functions

Another useful thing in Apache Spark toolbox is Spark SQL Window Functions. So what it can be useful for? It provides the ability to perform calculations across set of rows like calculating a moving average, calculating a cumulative sum, or accessing the values of a row appearing before the current row.

Let’s take an example of calculating the day-by-day volatility of BTC where we want to calculate BTC daily return as a factor of the previous day’s rate.

For that we can use DataFrame API or use SQL expressions by registering temporary view of our dataframe.

Volumes by currency

Now let’s get back to TimeSeriesRDD and refresh our knowledge on time seriese data summarization with one more example. Let's say we want to track changes in volume of BTC in different fiat currencies in 14 day intervals. For that we can use already seen .summarizeIntervals method with additional key argument to group results by currency_code.

and to take a closer look at CNY currency

Temporal Join

Now we want to obtain ETH prices in fiat currencies. But in crypto_prices table we have only BTC prices for all other crypto currencies while prices in fiat currencies we have only for BTC in btc_prices table.

And this is where JOIN comes in handy. But in case of time series data it would be a Temporal Join. And again Flint provide several options for that.

Temporal join functions define a matching criteria over time. It could be an exact match or it can look past or look future to find closest row from other table with timesatmp located within some tolerance interval.

So given BTC prices in fiat currencies for some timestamp in btc_prices table we want to find closest ETH prices in BTC within 1 day from crypto_prices table.

we keep only those records for which matching criteria is met.

Prices in different fiat currencies might be in different scales like for USD and CNY, so plotting them on the same chart with single yaxis might be not a good idea. For that one can plot them on the same chart but with multiple yaxes or use subplots.

Another good example on using temporal joins would be calculating of total transaction volume in USD for top all crypto currencies in the dataset over the past week.

An important thing to note here is that when performing a join temporal join function tries to find one closest row from right table. But in our case several rows from crypto_prices table corresponding to different currencies share the same timestamp and we want to join BTC prices for all of them. The solution is to group all rows sharing exactly the same timestamp in crypto_prices table using .groupByCycle function.

After this we can use explode function to create a new row for each element in rows array which contains all the rows from crypto_prices table sharing exactly the same timestamp.

Now we can perform required aggregation on both crypto_btc_usd_prices and btc_usd_prices tables and unite the the results.

Conclusion

I hope this notebook has given you some ideas on how can you use these great tools like Apache Spark, Flint library, Spark Notebook and Plotly scientific graphing library for time series data analysis. Also given that Apache Spark is a fast and general engine for big data processing you can use all these tools on much larger datasets in cluster computing environment.

And again many thanks to @sarahpan for sharing her ideas on analysis of given dataset.

By Andrey Romanov
Software engineer, ML pipelines builder, GPU burner
GitHub, Twitter, LinkedIn

--

--