Deep Reinforcement Learning for Stock Trading with Kafka and RLlib

MLOps tutorial on how to build low latency, scalable, distributed data stream to generate real-time predictions

Ryan Martin
Geek Culture

--

Photo by Maxim Hopman on Unsplash

In this article, we will use the MLOps workflow to productionize a deep reinforcement learning agent with Kafka and Ray, using algorithmic trading as an example.

We will show how to obtain live stock trading data from a python client library by creating a producer with Kafka. We will consume this stock data stream and structure it in KSQL, and then sink to a new Kafka topic. Next we will subscribe to this new topic in Faust, where our model will serve real-time predictions in a REST interface. We will run this app locally on a Kafka cluster with Docker microservices.

We will assume the reader already has knowledge of basic model training workflow. This article will provide a useful perspective on serving models in low latency data streaming infrastructures.

The frameworks used in this lesson include:

  • Kafka — an open-source streaming data store that acts as a messaging queue with producers and consumer classes
  • ksqlDB — an event streaming processor for creating structured queries from Kafka topics using the KSQL querying language
  • Faust — a python client library for developing high performance distributed systems for real-time applications and microservices with Kafka clusters
  • Ray — a core API for building distributed applications in python
  • RLlib — a reinforcement learning library for training and deploying scalable RL agents
  • Ray Serve — a framework-agnostic toolkit to serve machine learning models with a scalable backend and web server endpoint
  1. Start Kafka Broker and Dependencies with Docker Compose

We will build our stock trading application using a microservices architecture with Docker. In the following docker compose file, we can containerize our Kafka broker and zookeeper, as well as the ksqlDB dependencies.

Zookeeper will run on port “2181” and the Kafka broker will run on port “29092.” We will also have a separate port for the ksqlDB server on “8088.” Remembering these ports will help in streamlining the data engineering aspect of this project using the Kafka cluster.

This Kafka broker serves an important role in enabling the consumers to fetch messages and transfer data through topics shared within the dependencies in the Kafka cluster. Later stages in our development will demonstrate how this really works.

To run the following docker-compose.yml file, we will enter into the command line:

docker-compose up

Unless we chose to run this file in a headless state, we should see some output in our CLI from the broker and ksqlDB server, such as this:

Screenshot of running Kafka cluster

We are now running our Kafka cluster with all the necessary dependencies.

2. Create a Kafka Producer to Stream Data from the NYSE

Next, we will create a producer to stream one-minute bars from the NYSE in real time. We will use the polygon.io python client as our web-socket source, and sink this data into a Kafka topic, so that we can access this data stream anywhere in the Kafka cluster.

A subscription to polygon.io will be necessary to stream this data; however, we can apply this method to other web-sockets and other use-cases.

In this script, we instantiate a Kafka Producer object, using the port from the Kafka broker, and serialize our message with json. Then, within the message handler, we call our Kafka producer’s send method, naming our topic, ‘stock-min-bars,’ and setting our value to the message string.

With our Kafka topic, we now have the option of doing a variety of things with our data stream from the NYSE. Because we have a deep reinforcement learning agent, which predicts the best actions given the current state of the environment, we must restructure our data stream to reflect a specific batch size as defined by our trained model.

3. Structure Streaming Data with ksqlDB

While examining our model trained in RLlib, we see that the batch size is 30, meaning we should fetch 30 instances of data before the latest offset. This will effectively create a sliding window between the current data and the last 30 instances of data. When the next one-minute candlestick arrives from our datasource and sinks into our Kafka topic, our sliding window will advance by one offset to reflect the new, current window in the data stream.

Our model will make predictions on every new window in real time, and this process will continue as long as the source produces data while running our Kafka cluster.

To accomplish this, we will enter KSQL queries directly into the CLI, starting by accessing the container and configuring ksqlDB.

docker-compose exec ksqldb-cli ksql http://ksqldb-server:8088

Next, we will create a stream of data based on the values present in the Kafka topic that we created in the last step by entering:

CREATE STREAM stock_min_bars_stream (
ev VARCHAR,
sym VARCHAR,
v DOUBLE,
o DOUBLE,
c DOUBLE,
h DOUBLE,
l DOUBLE,
price DOUBLE
) WITH (
kafka_topic = 'stock_min_bars',
value_format = 'json'
);

With the ‘stock_min_bars_stream’ we can create a table that will act as a sliding window. In the same KSQL query, we will sink that table to a new topic called ‘stock_sliding_window’ using ‘AS SELECT’ to make the sliding window available within the Kafka cluster. The ‘LATEST_BY_OFFSET’ method will fetch the previous 32 instances before the current instance. Here, we do not create a ‘HOPPING_WINDOW’ or ‘TUMBLING_WINDOW’ because we really only want the previous values before our current offset, not a single value of an aggregate function over a certain timeframe. (Visit the ksqlDB docs on the confluent website for much more thorough explanations on these methods).

CREATE TABLE stock_sliding_window AS
SELECT sym AS ROWKEY,
AS_VALUE(sym) as SYMBOL,
LATEST_BY_OFFSET(price,32) as PRICE,
LATEST_BY_OFFSET(v,32) as V,
LATEST_BY_OFFSET(o,32) as O,
LATEST_BY_OFFSET(c,32) as C,
LATEST_BY_OFFSET(h,32) as H,
LATEST_BY_OFFSET(l,32) as L
FROM stock_min_bars_stream
GROUP BY sym
EMIT CHANGES;

We can ensure that our KSQL queries worked as intended by issuing the following commands in the CLI:

“SHOW TABLES;” will list the tables we have created.
“SELECT * FROM STOCK_SLIDING_WINDOW EMIT CHANGES;” will return the actual contents of the table. Notice that the columns have “buffered up” a list of previous values based on the latest offset.

With this structured stream, we have effectively given our agent a mini-time series representation of the past 30 minutes for any given asset. This batch of data will enable our agent to make effective trade decisions.

4. Implement a Backend and Endpoint to Serve Our RL Agent

Note: Before we begin this demonstration, refer to the RLlib docs on the Ray website for a more simple tutorial on serving a “cart-pole” baseline agent .

After training an RL agent in RLlib, implementing a backend that will ingest the data in realtime only requires two steps: loading the model weights and parsing the json requests.

The following code contains many configurations that we used to train our stock trading agent. RLlib provides an array of configurations to experiment with, allowing vast opportunities to build an effective agent for our use-case.

As a disclaimer, although this agent will achieve a solid reward on historical data, that does not mean that this agent will make us wealthy trading in a live environment. Many of the world’s brightest minds enter the field of high frequency trading to earn substantial incomes; however, even the best quants lose money at times because of many hazards: price slippage, overfitting, look-ahead bias, improper order execution, minor bugs in the code, quote stuffing, et cetera. This tutorial shows a simple “toy” stock trading RL agent based on an environment developed by Maxim Lapan in the essential Deep RL textbook, Deep Reinforcement Learning — Hands On.

To summarize the contents of the backend server script, lines 60–71 contain the “StockTradingModel” class, which loads the model weights from checkpoint, using the “self.agent” object. In production, these weights should come from an AWS S3, or some other cloud storage. The async function on line 65 parses the incoming json requests and computes the most lucrative action based on the observation from the json request. (The next section will go over how we send these requests asynchronously from the Kafka cluster in real time).

The sample of code below exposes the backend to an HTTP and establishes an endpoint for the SAC agent to serve predictions.

The docs on Ray Serve show how to start this on the Ray cluster. In our project, we will simply run:

ray init --headpython deploy_sac_trader.py

To stop the Ray cluster, enter:

ray stop

5. How to Query our Backend Server with Faust

The Faust framework aligns perfectly with our use-case, since a crypto and stock brokerage, Robinhood, developed it to provide a framework for python-based, high performance distributed systems.

In this section, we will demonstrate how to use Faust to asynchronously query our RL agent server and use the response (predictions) to buy and sell stocks on the Alpaca REST API for paper trading.

In the script above, we implemented a Faust app that connects the structured data stream we built in ksqlDB to the Ray server, which allows our RL agent to make real-time predictions on the best trades every minute.

The Faust app accomplishes this by connecting to the Kafka cluster via “FAUST_BROKER_URL” on line 47, where it consumes the “STOCK_SLIDING_WINDOW” topic from ksqlDB. Then, in the method on line 93, we utilize the async/await paradigm to asynchronously query the server with the data stream. This allows a low latency response as the data arrives.

This shows an output of market orders during a paper trading test.

Once the server responds with the trade actions, we produce to a new topic, “STOCK_ORDERING_SYSYEM”, as an await-able object on line 124. The await allows the task of sending the server response to a new Kafka topic to happen concurrently after the trade action arrives. We consume the stock orders from this new topic in another method, on line 87, to send the trade decisions to the brokerage api. The add_processor method on line 88 allows the order_signal method on line 57 to act as a callback function so that the stock orders reach the REST API asynchronously.

Conclusion: Deep reinforcement learning combined with state-of-art data streaming frameworks has the potential to solve many business problems by automating processes at scale, including stock and crypto trading. However, one must use caution when testing algorithms in a live trading environment. An effective way to test a strategy involves paper trading, or simulated trading. Many online brokerage API’s provides these simulations and can inspire more robust strategies to evolve.

View the entire project on GitHub.

Visit my professional webpage for more information.

References:

Lapan, Maxim. Deep Reinforcement Learning Hands-On. Second Edition ed., MITP, 2019.

Sutton, Richard S., and Andrew Barto. Reinforcement Learning: an Introduction. The MIT Press, 2018.

--

--

Ryan Martin
Geek Culture

MSCS Grad Northeastern University 2020. Former Data Scientist Intern @ Pacific Northwest National Laboratory. My ideas and opinions here are completely my own.