Realtime Crypto Tracker with Kafka and QuestDB

Yitaek Hwang
Feb 17 · 7 min read

Analyze cryptocurrency price trends in realtime via Kafka and store for further investigation in a timeseries database.

Photo by M. B. M. on Unsplash

“Bitcoin soars past $50,000 for the first time” — CNN

“Tesla invests $1.5 billion in bitcoin, will start accepting it as payment” — Washington Post

Not a day goes by without some crypto news stealing the headlines these days. From institutional support of Bitcoin to central banks around the world exploring some form of digital currency, interest in cryptocurrency has never been higher. This is also reflected in the daily exchange volume:

As someone interested in the future of DeFi (decentralized finance), I wanted to better track the price of different cryptocurrencies and store them into a timeseries database for further analysis. I found an interesting talk by Ludvig Sandman and Bruce Zulu at Kafka Summit London 2019, “Using Kafka Streams to Analyze Live Trading Activity for Crypto Exchanges”, so I decided to leverage Kafka and modify it for my own use. In this tutorial, we will use Python to send real-time cryptocurrency metrics into Kafka topics, store these records in QuestDB, and perform moving average calculations on this time series data with numpy.

Project Setup

At a high level, this project polls the public Coinbase API for the price of Bitcoin, Ethereum, and Chainlink. This information is then published onto individual topics on Kafka (e.g. topic_BTC). The raw price information is sent to a QuestDB via Kafka Connect to populate the timeseries database. At the same time, a separate consumer also pulls that data and calculates a moving average for a quick trend analysis.

The codebase is organized into three parts:

  • docker-compose: holds docker-compose file to start Kafka (zookeeper, broker, kafka connect), QuestDB, and JSON file to initialize Kafka Connect
  • docker: Dockerfile to build Kafka Connect image (pre-built image is available via docker-compose)
  • Python files: grabs latest pricing information from Coinbase, pubishes information to Kafka, and calculates a moving average

If you would like to analyze different cryptocurrencies or extend the simple moving average example with a more complicated algorithm like relative strength index analysis, feel free to fork the repo on Github:

Prerequisites

  • Docker (with at least 4GB memory): if using Docker Desktop, go to Settings -> Resources -> Memory and increase he default limit from 2GB to 4GB
  • Python 3.7+

Setting up Kafka & QuestDB

Before pulling data from Coinbase, we need a running instance of a Kafka cluster and QuestDB. In the repo, I have a working docker-compose file with Confluent Kafka components (i.e. zookeeper, broker, Kafka Connect) and QuestDB. If you would like to run this on the cloud or run it locally, follow the instructions on the Confluent website. Otherwise simply apply the docker-compose file:

cd docker-compose
docker-compose up -d

The docker-compose file runs the following services:

  • Zookeeper
  • Kafka Broker
  • Kafka Connect with JDBC driver
  • QuestDB

The Kafka Connect image is based on . If you wish to modify this image (e.g. add a new connector to MongoDB or modify the bootup process), you can override the Dockerfile and build it locally.

Wait for the Kafka cluster to come up. Watch the logs in the container until you see the following messages:

[2021-02-17 01:55:54,456] INFO [Worker clientId=connect-1, groupId=compose-connect-group] Starting connectors and tasks using config offset -1 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)[2021-02-17 01:55:54,456] INFO [Worker clientId=connect-1, groupId=compose-connect-group] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder)[2021-02-17 01:55:54,572] INFO [Worker clientId=connect-1, groupId=compose-connect-group] Session key updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)

Configuring Postgres Sink

At this point, we have a health Kafka cluster and a running instance of QuestDB, but they are not connected. Since QuestDB supports Kafka Connect JDBC driver, we can leverage the PostgreSQL sink to populate our database automatically. Post this connector definition to our Kafka Connect container:

# Make sure you're inside the docker-compose directory$ curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" --data @postgres-sink-btc.json http://localhost:8083/connectors

holds the following configuration details:

{
"name": "postgres-sink-btc",
"config": {
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max":"1",
"topics": "topic_BTC",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"connection.url": "jdbc:postgresql://questdb:8812/qdb?useSSL=false",
"connection.user": "admin",
"connection.password": "quest",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "true",
"auto.create": "true",
"insert.mode": "insert",
"pk.mode": "none"
}
}

Some important fields to note:

  • topics: Kafka topic to consume and convert into Postgres format
  • connection: Using default credentials for QuestDB (admin/quest) on port 8812
  • value.converter: This example uses JSON with schema, but you can also use Avro or raw JSON. If you would like to override the default configuration, you can refer to Kafka Sink Connector Guide from MongoDB.

Poll Coinbase for Latest Crypto Prices

Now our that our Kafka-QuestDB connection is made, we can start pulling data from Coinbase. The Python code requires , , and to run. Using , install those packages and run the script:

$ pip install -r requirements.txt
$ python getData.py

It will now print out debug message with pricing information as well as the schema we’re using to populate QuestDB:

Initializing Kafka producer at 2021-02-17 14:38:18.655069
Initialized Kafka producer at 2021-02-17 14:38:18.812354
API request at time 2021-02-17 14:38:19.170623
Record: {'schema': {'type': 'struct', 'fields': [{'type': 'string', 'optional': False, 'field': 'currency'}, {'type': 'float', 'optional': False, 'field': 'amount'}, {'type': 'string', 'optional': False, 'field': 'timestamp'}], 'optional': False, 'name': 'coinbase'}, 'payload': {'timestamp': datetime.datetime(2021, 2, 17, 14, 38, 19, 170617), 'currency': 'BTC', 'amount': 50884.75}}API request at time 2021-02-17 14:38:19.313046
Record: {'schema': {'type': 'struct', 'fields': [{'type': 'string', 'optional': False, 'field': 'currency'}, {'type': 'float', 'optional': False, 'field': 'amount'}, {'type': 'string', 'optional': False, 'field': 'timestamp'}], 'optional': False, 'name': 'coinbase'}, 'payload': {'timestamp': datetime.datetime(2021, 2, 17, 14, 38, 19, 313041), 'currency': 'ETH', 'amount': 1809.76}}
API request at time 2021-02-17 14:38:19.471573
Record: {'schema': {'type': 'struct', 'fields': [{'type': 'string', 'optional': False, 'field': 'currency'}, {'type': 'float', 'optional': False, 'field': 'amount'}, {'type': 'string', 'optional': False, 'field': 'timestamp'}], 'optional': False, 'name': 'coinbase'}, 'payload': {'timestamp': datetime.datetime(2021, 2, 17, 14, 38, 19, 471566), 'currency': 'LINK', 'amount': 31.68216}}
API request at time 2021-02-17 14:38:23.978928
Record: {'schema': {'type': 'struct', 'fields': [{'type': 'string', 'optional': False, 'field': 'currency'}, {'type': 'float', 'optional': False, 'field': 'amount'}, {'type': 'string', 'optional': False, 'field': 'timestamp'}], 'optional': False, 'name': 'coinbase'}, 'payload': {'timestamp': datetime.datetime(2021, 2, 17, 14, 38, 23, 978918), 'currency': 'BTC', 'amount': 50884.75}}
...

Query Data on QuestDB

QuestDB is a fast, open-source, timeseries database with SQL support. This makes it a great candidate to store financial market data for further historical trend analysis and generating trade signals. By default, QuestDB ships with a console UI exposed on port 9000. Navigate to localhost:9000 and query Bitcoin tracking topic to see price data stream in:

You can repeat this process for the other topics as well. If you prefer to run without a UI, you can also use the REST API to check:

$ curl -G \
--data-urlencode "query=select * from topic_BTC" \
http://localhost:9000/exp

QuestDB console UI also provides the ability to generate basic graphs. Click on the Chart tab underneath the Tables. Select as the chart type, as the label, and click :

Unfortunately, the QuestDB native charting capabilities are currently limited. For more advanced visualization, check out my previous guide on streaming heart rate data to QuestDB under the “Visualizing Data with Grafana” section.

Calculate Moving Average

While we store the raw data on QuestDB for more sophisticated analysis, we can also consume from the same topics to calculate a quick moving average. This may be useful if you want to also post these records to another Kafka topic that you may use on a dashboard or to set alerts on pricing trends.

On a separate terminal, run the moving average script:

$ python movingAverage.py

It will print out the moving average of 25 data points and post it to :

Starting Apache Kafka consumers and producer
Initializing Kafka producer at 2021-02-17 16:28:33.584649
Initialized Kafka producer at 2021-02-17 16:28:33.699208
Consume record from topic 'topic_BTC' at time 2021-02-17 16:28:34.933318
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.072581
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.075352
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.077106
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.088821
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.091865
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.094458
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.096814
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.098512
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.100150
Produce record to topic 'topic_BTC_ma_25' at time 2021-02-17 16:28:35.103512

If you wish to also populate these data points into QuestDB, supplement the JSON data with schema information in similar to the way it is defined in the JSON block in . Then create another Postgres sink via curl with topic set as .

Wrapping Up

To stop streaming data, simply stop the Python scripts. To destroy the Kafka cluster and QuestDB, run:

$ docker-compose down

While this is a simple example, you can extend this to optimize the data format with Avro, connect it with your Coinbase account to execute trades based on trading signals, or test out different statistical methods on the raw data. Feel free to submit a PR to make this repo more useful.

The Startup

Get smarter at building your thing. Join The Startup’s +725K followers.

Yitaek Hwang

Written by

Sr. Software Engineer at Axoni writing about cloud, DevOps/SRE, and crypto topics: https://yitaekhwang.com

The Startup

Get smarter at building your thing. Follow to join The Startup’s +8 million monthly readers & +725K followers.

Yitaek Hwang

Written by

Sr. Software Engineer at Axoni writing about cloud, DevOps/SRE, and crypto topics: https://yitaekhwang.com

The Startup

Get smarter at building your thing. Follow to join The Startup’s +8 million monthly readers & +725K followers.

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store