Making Real-Time Trading App with Golang, Kafka & Websockets: Consumer Service & Websockets

Mohanish Patel
6 min readSep 23, 2023

--

As an engineer, we constantly wonder how different systems work behind the hood. Since I invest a bit of my earnings on a monthly basis, I have had a fair share of experiences with various trading apps. Trading View, Binance, and Zerodha Kite are some of them.

We will be dividing this blog into 4 Parts:
1. Introduction & Setup
2.
Setting up Kafka with Golang.
3.
Consumer Service & Websockets.
4.
Frontend.

Creating Consumer Service & Websockets

In this series, we have so far covered the part where we went through some basic introduction to tech stack, and high-level architecture and implemented a producer service with Kafka and Golang.

So far so good. We will be now going through listening for kafka events and pushing ticker data to the front end via web sockets.

Why websockets and not kafka on frontend?

A quick revision to the introductory post. I have mentioned that there is next to no support for Kafka on browsers.

Some of the reasons:

  1. By default Kafka uses TCP and browsers keep TCP connections open for a very short time.
  2. Kafka’s policy is to distribute messages across consumers. For each tab, our app opened will be treated as a separate consumer. Thus, each tab won’t receive all the data points but rather will be distributed across not only tabs but across all other devices consuming it.
  3. Kafka consumers use a decent amount of resources to keep track of offset, message reads, and overall states. Compared to this, WebSockets work is lightweight.

Let’s not go too deep.

I think I have given a good enough justification for using WS to deliver data on the front end. Now we should start with some implementation on the consumer part.

Folder Structure.

As we did in the last post, we will create folders & files from scratch. The consumer service app we are creating not only will consume the Kafka stream but will also host the API to a list of tickers available and a connection to subscribe to it. You can give whatever name you want.

Run the below command inside the consumer project folder.

go mod init <consumer_app_name>

Dependencies for this app.

module app

go 1.20

require (
github.com/gofiber/contrib/websocket v1.2.1
github.com/gofiber/fiber/v2 v2.49.1
github.com/joho/godotenv v1.5.1
github.com/segmentio/kafka-go v0.4.42
)

require (
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/fasthttp/websocket v1.5.4 // indirect
github.com/google/uuid v1.3.1 // indirect
github.com/klauspost/compress v1.16.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/rivo/uniseg v0.4.4 // indirect
github.com/savsgio/gotils v0.0.0-20230208104028-c358bd845dee // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasthttp v1.49.0 // indirect
github.com/valyala/tcplisten v1.0.0 // indirect
golang.org/x/sys v0.12.0 // indirect
)

Now you don’t need to know about all dependencies, we’re using fiber just to serve a basic http endpoint. Other dependencies are carried forward from the previous producer article.

Let’s start with files one by one. First, let’s create the core structure of our app.

.env

KAFKA_HOST=127.0.0.1
KAFKA_PORT=9092


#Zookeeper
ALLOW_ANONYMOUS_LOGIN=yes
ZOO_PORT_NUMBER=2181


#Kafka
KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
ALLOW_PLAINTEXT_LISTENER=yes
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092
TICKERS="btcusdt,ethusdt,busdusdt,bnbusdt,ltcusdt,xrpusdt,maticusdt"

core/settings.go

package core

import (
"log"
"os"
"strings"

"github.com/joho/godotenv"
)

var TICKERS []string
var KAFKA_HOST string
var KAFKA_PORT string

func Load() {

err := godotenv.Load()
if err != nil {
log.Fatal("Failed to load environment file")
}
t := os.Getenv("TICKERS")
TICKERS := strings.Split(t, ",")
LoadTikers(TICKERS)

KAFKA_HOST = os.Getenv("KAFKA_HOST")
KAFKA_PORT = os.Getenv("KAFKA_PORT")
}

core/ticker.go

package core

import "strings"


var tickerSet map[string]struct{}

func GetAllTickers() []string{
tickerList := []string {};
for key := range tickerSet {
tickerList = append(tickerList,key);
}
return tickerList
}

func IsTickerAllowed(ticker string) bool {
_, ok := tickerSet[strings.ToLower(ticker)]
return ok
}

func LoadTikers(tickers []string) {
if tickerSet == nil {
tickerSet = make(map[string]struct{})
}
for _, t := range tickers {
tickerSet[strings.ToLower(strings.Trim(strings.Trim(t,"\\"),"\""))] = struct{}{};
}
}

settings.go is nothing but loading all the env variables, and ticker.go has all the tickers and functions that we will be using later.

Let’s configure this part in our main.go

package main

import (
"app/core"
)

func main() {
core.Load()
}

nothing complicated, just simple core.Load(). Now our env file is loaded.
to check you can print tickers

fmt.PrintLn(core.GetAllTickers())

now only part that remains to build is the API and websocket connection.

api/ticker.go

package api

import (
"context"
"fmt"
"log"
"strings"

"github.com/gofiber/contrib/websocket"
"github.com/gofiber/fiber/v2"
"github.com/segmentio/kafka-go"

"app/core"
)

func GetAllTickers(c *fiber.Ctx) error {
c.Status(200).JSON(core.GetAllTickers())
return nil
}

func ListenTicker(conn *websocket.Conn) {
currTicker := conn.Params("ticker")
log.Println("Current ticker: ", currTicker)

if !core.IsTickerAllowed(currTicker) {
conn.WriteMessage(websocket.CloseUnsupportedData, []byte("Ticker is not allowed"))
log.Println("Ticker not allowed ticker: ", currTicker)
conn.Close()
return
}
topic := "trades-" + strings.ToLower(currTicker)
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{core.KAFKA_HOST + ":" + core.KAFKA_PORT},
Topic: topic,
})
reader.SetOffset(-1)

conn.SetCloseHandler(func(code int, text string) error {
reader.Close()
log.Printf("Received connection close request. Closing connection .....")
return nil
})

defer reader.Close()
defer conn.Close()

go func() {
code, wsMessage, err := conn.NextReader()
if err != nil {
log.Println("Error reading last message from WS connection. Exiting ...")
conn.Close()
return
}
fmt.Println(" CODE : %d MESSAGE : %s", code, wsMessage)
}()

for {
message, err := reader.ReadMessage(context.Background())
if err != nil {
log.Println("Error: ", err)
return
}
fmt.Println("Reading..... ", string(message.Value))

conn.WriteMessage(websocket.TextMessage, message.Value)
}
}

I will be explaining the ListenTicker function in detail. Let’s break it down into parts. Have a look at the comments.

func ListenTicker(conn *websocket.Conn) {
//this is the part where we get ticker value to subscribe and setup initial connenction
//example ws://127.0.0.1:8000/ws/trades/btcusdt
//this is how out path will look like
currTicker := conn.Params("ticker")
log.Println("Current ticker: ", currTicker)
if !core.IsTickerAllowed(currTicker) {
conn.WriteMessage(websocket.CloseUnsupportedData, []byte("Ticker is not allowed"))
log.Println("Ticker not allowed ticker: ", currTicker)
conn.Close() // we close the connection with the message if not dound
return
}
topic := "trades-" + strings.ToLower(currTicker) // topic
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{core.KAFKA_HOST + ":" + core.KAFKA_PORT},
Topic: topic,
}) //establishing connection to KAFKA
reader.SetOffset(-1) // we set reader offset to latest.
}

once the connection part is done we do some error handling.

func ListenTicker(conn *websocket.Conn) {
... // previous code

conn.SetCloseHandler(func(code int, text string) error {
reader.Close()
log.Printf("Received connection close request. Closing connection .....")
return nil
}) // this code will close kafka reader as soon as our ws conn is closed due to any reason.

defer reader.Close()
defer conn.Close() // please check previous article to know why we use defer.


// continuosly listening to ws connection in separate thread so that
// if it closes due to some error we close kafka reader too.
go func() {
code, wsMessage, err := conn.NextReader()
if err != nil {
log.Println("Error reading last message from WS connection. Exiting ...")
conn.Close()
return
}
fmt.Println(" CODE : %d MESSAGE : %s", code, wsMessage)
}()

// we read from kafka and continously send data to frotend.
for {
message, err := reader.ReadMessage(context.Background())
if err != nil {
log.Println("Error: ", err)
return
}
fmt.Println("Reading..... ", string(message.Value))
conn.WriteMessage(websocket.TextMessage, message.Value)
}
}

Now let’s link all this and expose ws and api connections.
routing.go

package api

import (
"github.com/gofiber/fiber/v2"
"github.com/gofiber/contrib/websocket"
)

func AddRoutes(app *fiber.App) {
app.Use("/ws", func(c *fiber.Ctx) error {
if websocket.IsWebSocketUpgrade(c) {
c.Locals("allowed", true)
return c.Next()
}
return fiber.ErrUpgradeRequired
})
app.Get("ws/trades/:ticker",websocket.New(ListenTicker))
app.Route("api/v1", func(router fiber.Router){
router.Get("/tickers",GetAllTickers)
})
}

main.go

package main

import (
"app/api"
"app/core"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/cors"
)

func main() {

core.Load()
app := fiber.New()
app.Use(cors.New(cors.Config{
AllowHeaders: "Origin,Content-Type,Accept,Content-Length,Accept-Language,Accept-Encoding,Connection,Access-Control-Allow-Origin",
AllowOrigins: "*",
AllowCredentials: true,
AllowMethods: "GET,POST,HEAD,PUT,DELETE,PATCH,OPTIONS",
}))

api.AddRoutes(app)
app.Listen(":8000")
}
go build . && ./app 

open browser and go to URL 127.0.0.1:8000/api/v1/tickers. You will get a response something like this.

Before moving forward make sure you build the producer service and make sure it is running in order to push to Kafka's current data.

Don’t forget to change 127.0.0.1 to kafka in the .env file if you are running with docker. Ignore if running on local.

You can quickly do it with this command.

go build . && ./<producer_app_name> #inside producer folder.  

// or

sudo docker-compose up -d

Now let’s test the web socket stream.

If you also get the data, then your backend is ready which links to Kafka.

Moving towards the end.

We are almost done now, we have the data coming from the backend to the frontend. Now in the next article, we will represent the data in charts and will see how it works.

--

--