Creating a Crypto Analytics Platform with Kafka, ClickHouse and Cryptofeed

David Pedersen
9 min readSep 16, 2021

--

Some royalty free image of some finance looking image because I want to look more fancy

We will explore the first step in developing an analytics infrastructure to properly research cryptocurrency exchange dynamics, data collection and storing. Future posts will cover big data processing (Spark), machine learning/analytics, and dashboards.

Market Microstructure is a specialized field within Economics and Finance that studies the dynamics of financial markets, focusing on players (market makers, speculators, and noise traders) and exchange structure (fragmentation, Dealer vs LOB and market transparency). Exchange data tends to contain massive amounts of trade and order book data. In order to collect this data effectively, we will need to create a reliable system to manage high frequency data and large-scale quantities of data. A week of trading data for one asset can easily reach hundreds of millions of data points within a single exchange. Note we will not go into deep detail about the mechanics of ClickHouse and Kafka. Only an introductory explanation will be given to why these tools were selected. The priority of this blog is to show how to build a cryptocurrency data collection and storing system from start to finish. Links for deeper understanding of the tools will be provided throughout the tutorial.

Problem 1: Where to store our massive datasets? Introducing ClickHouse

ClickHouse is a column based database system that is built to easily aggregate millions of data points within milliseconds. ClickHouse is an OLAP database (online analytical processing), this means that ClickHouse specializes in high insert throughput, inserts of hundreds of thousands to millions of rows at a time, rather than individual inserts. The pro is that ClickHouse will have unmatched speed for analytics, but can not support continuous single inserts. This is where Kafka comes in.

Problem 2: How to produce high throughput? Introducing Kafka

Apache Kafka is a distributed publish-subscribe messaging system that is designed for high throughput. When collecting high frequency exchange data it is vital to have a tool that can be fast, fault-tolerant and scalable. Kafka achieves all of these needs for us and is done by a well balanced combination of producers, consumers and brokers.

The middle-men, the brokers act as a relay between the producers (messengers of data) to consumers (receivers of data). Brokers do this by storing messages into topics, different types of messages, such as transactions, Iot data, trackings, etc. The topic is what determines the path from the producer to the consumer.

Problem 3: Where to get the data? Introducing CryptoFeed

Cryptofeed, created by Bryant Moscon, is a Python library that utilizes asyncio and WebSockets in order to provide cryptocurrency exchange feed from a large variety of exchanges. With Cryptofeed, one can add multiple data streams from crypto exchanges into one unified feed. Cryptofeed provides many examples of how to integrate it with multiple backends, including Kafka.

Setting up Kafka and ClickHouse Containers

Now it is time to make our data collection and storage system, utilizing ClickHouse for massive data storage and Kafka for effective data ingestion into ClickHouse. This will all be contained with docker containers for easy reproducibility and deployment.

Step 1: Install Docker and Docker Compose

We will be utilizing docker for deploying ClickHouse and Kafka, so it is recommend that you have you docker environment ready prior to continuing

Step 2: Creating Clickhouse-Kafka Docker Compose

First step is to create a directory where we will store our dockerfiles, requirements file and our main.py. I called mine CryptoDB

When in the directory create a docker-compose.yml file and edit the compose file as below. Let us see what is happening within each step

ClickHouse-Server: The service that will create our ClickHouse Database and will expose ports 8123 and 9000 for HTTP/TCP. All of the data will be stored on a named volume, clickhouse-volume.

Zookeeper: Required for managing Kafka nodes

Kafka: The service will create our Kafka node. The environmental variables are essential for Kafka to run properly and will change upon where you are hosting Kafka. This tutorial gives a great explanation on what these environmental variables mean. Overall most of the environmental variables determine how Kafka communicates with other services. The primary port Kafka uses is port 9092.

CryptoFeed: Builds a python environment and runs main.py to create our Kafka producers/topics and start out WebSocket connections.

With our Docker-compose prepared, we can do the last step before we start the containers. Writing the cryptofeed script and setting up the CryptoFeed dockerfile.

Preparing CryptoFeed Script

Step 1: Writing the CryptoFeed Script

Before we utilize ClickHouse Kafka engines, we first need to create a script utilizing CryptoFeed, so we know how we are going to structure our tables in ClickHouse. We will collect BTCUSD trade data and level 2 limit order book data from two cryptocurrency exchanges. BitFinex captures approximately 3.5% of total 24h volume in the world and Binance captures approximately 25.5%. I select these two exchanges because of the interesting dynamics we may capture due to the large fragmentation of crypto markets. Massive exchanges such as Binance may have unique market microstructure behavior compared to a smaller exchange, such as BitFinex.

Note: Hostname must be the same as the Kafka service from the docker-compose file, else the default is to use localhost. Localhost will not work in our multi-container approach.

We add feeds to our feed handler, which will connect to our exchange’s apis via websocket and then sends the trade data via Kafka producers to our Kafka Brokers that store our topics. CryptoFeed creates its own Kafka producer and topic when utilizing TradeKafka and BookKafka.

There will be a unique topic for every channel, exchange and symbol, so in total we create 4 topics due to our 2 exchanges(Binance and BitFinex), 2 channels (TRADES and L2_BOOK) and one symbol (BTC-USD)

The Structure of the topics are:

<channel>-<EXCHANGE>-<SYMBOL> (ex. trades-COINBASE-BTC-USD)

An example of how to add Level three limit book data from CoinBase is also provided, but commented out. Feel free to explore CryptoFeed, but note that we will not be covering how to collect level 3 order book data.

Knowing that we will be utilizing L2 book data and trade data, The structure of our final tables will look something like this.

Trades: {feed, symbol, timestamp, receipt_timestamp, side, amount, price, order_type, id}

Book: {timestamp, receipt_timestamp, delta, bid[0–25], ask[0–25], _topic}

Both bid and ask side of the limit order book will have a maximum depth of 25 price levels

An extra column _topic, the Kafka topic, is added to the order book table because the original data does not contain exchange and symbol. _topic will be used as a filtering method for later analytics.

Step 2: Creating CryptoFeed dockerfile

In order for our CryptoFeed script to run on our container we require a python environment with all necessary packages, you can create the environment as you wish. The way I created it is below.

My dockerfile just uses a standard python slim image, installs the requirements and runs the main.py script. Very simple.

The requirements are:

Aiokafka is required to create our Kafka Producers

With everything set we can start our docker compose with the standard:

docker-compose up -d

When all of our containers are up and running our Kafka brokers should be starting to collect exchange data into our auto generated topics. Now its time to store them for the long term.

Preparing Clickhouse-Kafka Setup

The next step is to create our consumers to ingest and then store the data in ClickHouse tables. I used this helpful guide for my Kafka-ClickHouse setup and will guide you through it below.

In order to have a ClickHouse table collect data from a Kafka topic three steps must be completed

  1. Create the final MergeTree tables in ClickHouse to store data for long-term analysis
  2. Create Kafka engine tables to structure the topic data into ClickHouse Format
  3. Generate a materialized view to move data from the Kafka engine tables to the MergeTree tables

Step 1: Creating ClickHouse MergeTree Tables

Access your ClickHouse Server either via CLI or your favorite database management tool (ex: Dbeaver and HouseOps)

First create a new database to store crypto exchange tables

Then create the L2 order book MergeTree table and the Trades MergeTree Table. It is very important that the type is correct for each column. Look here if you need to explore what types to use.

An important thing to note for the L2_orderbook table is the complexity of storing bid and ask. The Map type was required to be able to store {price:volume} data at a depth of 25 levels. Map does not support float types, so to preserve that data the price key and volume value had to be stored as strings. Types can easily be returned to normal in the data processing phase. If you plan to use L3 data then an extra level of complexity is added because you will have to nest the bid and ask columns to store sub-columns of order id, price and volume pairs.

Last thing to note is that in order to add the virtual column, _topic, you must add the column AFTER creation of the table and only have the column present in the MergeTree table and not the Kafka table. (Yes I know I don’t understand why either)

Step 2: Creating Kafka Engine tables

Create the Kafka engine tables with the same column structure as the MergeTree tables. Each Kafka engine table subscribes to two topics based on the data it is collecting (trade or book) and a broker that stores the topic. For local testing purposes the Kafka broker is located at Kafka:9092. Since ClickHouse and Kafka share the same network we can use ‘Kafka’ as the host.

If you wish to know more detail about how the settings for a Kafka engine functions, look here. Furthermore, you will observe how the bid/ask complexity was fixed by using a trick to ingest the {price: volume} data structure. Since the topic stores {price:volume} as types {string:float}, it is necessary to use LowCardinality as a workaround to temporarily store the data before it is batched over to the MergeTree Table. Generally LowCardinality can not handle massive datasets like ours, but since it is only used to temporarily store data in a Kafka table, it is viable.

Step 3: Transfer data between tables with materialized views

The last step is to create the materialized view that will act as our insert mechanism between the MergeTree tables and the Kafka tables. You can observe that the cast function is used to convert the bid and ask columns to the correct datatypes and the _topic virtual column is requested. Virtual columns do not need to already be in the Kafka table in order to insert them into to the MergeTree table.

With the materialized views functional, the data that was being stored into the topics will start to be stored within our final MergeTree tables.

Troubleshooting and Testing

Now our data collection and storage system is running it is time to test that everything is functional. We start from the easiest troubleshooting methods to catch issues and move up in complexity if the issue persists

Simple checks with ClickHouse

We can first check that all of out tables have been created and check if there is data in our final MergeTree Tables. You can use ClickHouse CLI or my favorite, an SQL console on Dbeaver.

If you reach to the point that there is no data in your Kafka tables then it is likely due to a Kafka producer issue or one of your columns is incorrectly typed. If you get a cannot parse error it is likely due to improper typing, if not then it is likely due to using incorrect Kafka broker list when creating your Kafka tables.

Always make sure that your Kafka broker list = Kafka advertised listeners.

Checking Kafka brokers and topics

If the above issues were not the case for not having any data in your tables it is time to check if you even have any data stored in your topics. In order to check if the topics have been created and that they are storing data we can access the Kafka container’s bash and run some commands

If you followed the tutorial you should see all four topics along with thousands of messages per second on your terminal. If you do see your data here go back to ClickHouse troubleshooting and check the logs else it is time to check our Kafka producers and WebSocket connections

Checking CryptoFeed Logs

If you have no data in your topics then it is time to check the CryptoFeed logs. The feed handler produces logs when main.py starts running, showing if WebSocket connections were successful and if any errors occurred during the production of the Kafka producers. These logs are stored within the same directory as the docker-compose file.

Note: LEADER_NOT_AVAILABLE is a common error that occurs due to improper broker host. If you are going to separate your ClickHouse and Kafka into different host machines this will be a likely error. Look at this link how to error occurs and how to fix it

Conclusion

This tutorial covers the first and most essential step in creating your very own Cryptocurrency analytics platform, the data collection and storing system. Without data, we would be building a house with a hammer and no nails. Furthermore, with a functional ClickHouse-Kafka system (and plenty of storage space), we can easily store cryptocurrency trade data from dozens of exchanges. The next step is to preprocess our data with Apache Spark in order to prepare ourselves for creating prediction (LSTM & XGBoost) and causal inference models along with some exploratory analysis in order to gain deep insight into the market microstucture of cryptocurrency exchanges.

--

--