Stateless WebSocket Server using Redis PubSub

Victor Wu
9 min readAug 3, 2023

--

Managing hub<>client relations in a shared Redis server

Intro

Currently, our WebSocket server manages the relationship between hub and clients in process memory. For context, a hub is a board and clients are distinct users. For example, when a user logs into the application and visits a board, they are creating a WebSocket connection to the backend server to be able to perform live and interactive actions. Under the hood, the server handles the request by assigning the client connection to the appropriate board hub. If the board hub does not exist, it will be created as a map that relates hub to clients.

This implementation works for a single node but does not scale if we want to distribute connections / resources across multiple nodes. If we wanted to spin up a cluster to support thousands of concurrent WebSocket connections, we would inevitably split clients that belong to the same board hub.

Clients split between two machines

In the diagram above, we have two server processes each running on a separate machine. Clients C,F,P and I,M,N are split up even though they all share the same hub B. As a result, any WebSocket events generated by clients in Machine 1 won’t be communicated to the clients in Machine 2, and vice versa.

One solution to this problem is to offload the responsibility of managing hub and client relations to a separate service. We can do this pretty efficiently using Redis PubSub. The WebSocket servers will still maintain the client connections, but each client thread will now publish and subscribe to a shared channel that will act as our board hub.

Clients publish/subscribe to a shared Redis PubSub channel

Refactor

The first step in refactoring our implementation is to start up a Redis server. We can do this by building a Redis Docker image in our docker-compose.yaml file. We’ll call the container redis-ws which we’ll reference later when making the connection.

  redis-ws:
container_name: redis-ws
image: redis:latest
ports:
- 6379:6379

Now running docker-compose up command, we should be able to pull down that image and start the Redis server. Next, let’s connect to it using the Go Redis client. We’ll install it with the following command:

go get github.com/redis/go-redis/v9

In our ws.go file, we can connect to our Redis server and set it as a property on our WebSocket struct. This will allow client connections to access the Redis Go Client API and publish/subscribe to channels.

import "github.com/redis/go-redis/v9" 

func NewWebSocket(
userService user.Service,
boardService board.Service,
postService post.Service,
jwtService jwt.Service,
rdbConfig config.RedisConfig,
) *WebSocket {
rdb := redis.NewClient(&redis.Options{
Addr: fmt.Sprintf("%v:%v", rdbConfig.Host, rdbConfig.Port),
Password: "", // no password set
DB: 0, // use default DB
})

boardHubs := make(map[string]*Hub)
destroy := make(chan string)

go handleDestroy(destroy, boardHubs)

return &WebSocket{
userService: userService,
boardService: boardService,
postService: postService,
jwtService: jwtService,
rdb: rdb,
boardHubs: boardHubs,
destroy: destroy,
}
}

Let’s create a helper function for a client to subscribe to channel. It should take in a board ID parameter and subscribe to the channel. Any messages received via the PubSub channel should be forwarded to the client connection. We’ll also need a way to close this subscription and free up resources in case the client connection is lost. We’ll manage this by storing a map of subscription cancellation channels onto the client.

// subscribe subscribes a client to a Redis board channel
func (c *Client) subscribe(boardID string) {
pubsub := c.ws.rdb.Subscribe(context.Background(), boardID)
defer pubsub.Close()

cancel := make(chan bool)
c.subscriptions[boardID] = cancel

ch := pubsub.Channel()
fmt.Printf("Channel created for board %v\n", boardID)
for {
select {
case msg := <-ch:
// Forward messages received from pubsub channel to client
c.send <- []byte(msg.Payload)
case <-cancel:
fmt.Printf("Cancelling subscription %v\n", boardID)
return
}
}
}

We will utilize our subscribe helper function inside the handler that connects a user to a board. Below is the original handler function. Notice how we have some logic after the authorization checks to create a board hub if it doesn’t exist and to run the board hub inside a goroutine.

func handleBoardConnect(c *Client, msgReq Request) {
// Check if user is authenticated
user := c.user
if user == nil {
closeConnection(c, websocket.ClosePolicyViolation, CloseReasonUnauthorized)
return
}
// Unmarshal params
var params ParamsBoardConnect
if err := unmarshalParams(msgReq, &params, c); err != nil {
return
}
// Check if user has access to board
boardID := params.BoardID
boardWithMembers, err := c.ws.boardService.GetBoardWithMembers(context.Background(), boardID)
var msgRes ResponseBoardConnect
// If no access, return error response
if err != nil || !board.UserHasAccess(boardWithMembers, user.ID.String()) {
msgRes = ResponseBoardConnect{
ResponseBase: ResponseBase{
Event: EventBoardConnect,
Success: false,
ErrorMessage: ErrMsgBoardNotFound},
}
} else {
// If board does not exist as a hub, create one and run it
if _, ok := c.ws.boardHubs[boardID]; !ok {
c.ws.boardHubs[boardID] = newHub(boardID, c.ws.destroy)
go c.ws.boardHubs[boardID].run()
}

// Store write permission on the client's boards map
boardHub := c.ws.boardHubs[boardID]
c.boards[boardID] = Board{
canWrite: true,
}
existingUsers := boardHub.listConnectedUsers()
boardHub.register <- c

// Broacast successful message response
msgRes = ResponseBoardConnect{
ResponseBase: ResponseBase{
Event: EventBoardConnect,
Success: true,
},
Result: ResultBoardConnect{
BoardID: boardID,
NewUser: *user,
ConnectedUsers: existingUsers,
},
}
}
msgResBytes, err := json.Marshal(msgRes)
if err := handleMarshalError(err, "handleBoardConnect", c); err != nil {
return
}
// Only broadcast message if successful, otherwise send only to the client
if msgRes.Success {
c.ws.boardHubs[params.BoardID].broadcast <- msgResBytes
}
c.send <- msgResBytes
}

Now we don’t need that logic anymore, we can simply replace it with a go c.subscribe(boardID) and update our broadcast logic to publish to the Redis channel instead:

func handleBoardConnect(c *Client, msgReq Request) {
// Check if user is authenticated
user := c.user
if user == nil {
closeConnection(c, websocket.ClosePolicyViolation, CloseReasonUnauthorized)
return
}
// Unmarshal params
var params ParamsBoardConnect
if err := unmarshalParams(msgReq, &params, c); err != nil {
return
}
// Check if user has access to board
boardID := params.BoardID
boardWithMembers, err := c.ws.boardService.GetBoardWithMembers(context.Background(), boardID)
var msgRes ResponseBoardConnect
// If no access, return error response
if err != nil || !board.UserHasAccess(boardWithMembers, user.ID.String()) {
msgRes = ResponseBoardConnect{
ResponseBase: ResponseBase{
Event: EventBoardConnect,
Success: false,
ErrorMessage: ErrMsgBoardNotFound},
}
} else {

go c.subscribe(boardID)

// Broacast successful message response
msgRes = ResponseBoardConnect{
ResponseBase: ResponseBase{
Event: EventBoardConnect,
Success: true,
},
Result: ResultBoardConnect{
BoardID: boardID,
NewUser: *user,
ConnectedUsers: existingUsers,
},
}
}
msgResBytes, err := json.Marshal(msgRes)
if err := handleMarshalError(err, "handleBoardConnect", c); err != nil {
return
}
// Only broadcast message if successful, otherwise send only to the client
if msgRes.Success {
c.ws.rdb.Publish(context.Background(), boardID, msgResBytes)
}
c.send <- msgResBytes
}

We have one problem with this refactor. We no longer have the ability to retrieve the current list of connected users from our (deleted) board hub object. Since we’re using Redis, our individual servers may never have the complete list of users. Therefore, we can maintain the list by using Redis’s hash store. We can set the key to be the board ID and the field keys to be the user IDs. We can house this logic in a redis.go file under the same ws package.

package ws

import (
"context"
"encoding/json"
"fmt"

"github.com/Wave-95/boards/backend-core/internal/models"
"github.com/redis/go-redis/v9"
)

// setUser sets a user into the redis hash store organized by board ID. This hash store is used to
// manage the list of connected users.
func setUser(rdb *redis.Client, boardID string, user models.User) error {
userBytes, err := json.Marshal(user)
if err != nil {
return err
}
_, err = rdb.HSet(context.Background(), boardID, user.ID.String(), userBytes).Result()
if err != nil {
return err
}

return nil
}

// getUsers returns a map of all the connected users for a board
func getUsers(rdb *redis.Client, boardID string) (map[string]string, error) {
if res, err := rdb.HGetAll(context.Background(), boardID).Result(); err != nil {
return map[string]string{}, err
} else {
fmt.Println(res)
return res, nil
}
}

We’ll also need a way to delete the user from the hash store in case the user client disconnects from the board:

// delUser deletes a user from the redis hash store.
func delUser(rdb *redis.Client, boardID string, userID string) error {
_, err := rdb.HDel(context.Background(), boardID, userID).Result()
if err != nil {
return err
}

return nil
}

Putting everything back into the handleBoardConnect handler, this is our new and improved implementation:

// handleBoardConnect will attempt to connect a user to the board by registering the client
// to a board hub. If a board hub does not exist, it will be created in this handler. A successful
// board connect event will be broadcasted to all connected clients. The response contains the new
// user that's been connected as well as existing users.
func handleBoardConnect(c *Client, msgReq Request) {
// Check if user is authenticated
user := c.user
if user == nil {
closeConnection(c, websocket.ClosePolicyViolation, CloseReasonUnauthorized)
return
}
// Unmarshal params
var params ParamsBoardConnect
if err := unmarshalParams(msgReq, &params, c); err != nil {
return
}
// Check if user has access to board
boardID := params.BoardID
boardWithMembers, err := c.ws.boardService.GetBoardWithMembers(context.Background(), boardID)
var msgRes ResponseBoardConnect
// If no access, return error response
if err != nil || !board.UserHasAccess(boardWithMembers, user.ID.String()) {
msgRes = ResponseBoardConnect{
ResponseBase: ResponseBase{
Event: EventBoardConnect,
Success: false,
ErrorMessage: ErrMsgBoardNotFound},
}
} else {
rdb := c.ws.rdb

go c.subscribe(boardID)

if err := setUser(rdb, boardID, *user); err != nil {
closeConnection(c, websocket.CloseProtocolError, CloseReasonInternalServer)
}

mp, err := getUsers(rdb, boardID)
if err != nil {
closeConnection(c, websocket.CloseProtocolError, CloseReasonInternalServer)
}

connectedUsers, err := formatConnectedUsers(mp)
if err != nil {
closeConnection(c, websocket.CloseProtocolError, CloseReasonInternalServer)
}

// Broacast successful message response
msgRes = ResponseBoardConnect{
ResponseBase: ResponseBase{
Event: EventBoardConnect,
Success: true,
},
Result: ResultBoardConnect{
BoardID: boardID,
NewUser: *user,
ConnectedUsers: connectedUsers,
},
}
}
msgResBytes, err := json.Marshal(msgRes)
if err := handleMarshalError(err, "handleBoardConnect", c); err != nil {
return
}
// Only broadcast message if successful, otherwise send only to the client
if msgRes.Success {
c.ws.rdb.Publish(context.Background(), boardID, msgResBytes)
}
c.send <- msgResBytes
}

It’s important that we properly close PubSub subscriptions when they’re no longer needed. We should close them when a client disconnects from the WebSocket server.

func (c *Client) closeSubscriptions() {
for boardID, cancel := range c.subscriptions {
cancel <- true
rdb := c.ws.rdb
delUser(rdb, boardID, c.user.ID.String())
rdb.Publish(context.Background(), boardID, buildDisconnectMsg(c))
}
}

In the above function, we iterate over a client’s map of subscriptions and access the cancel channel. We pass a true message to the channel to signal to the subscription goroutine that it can break out of its loop and stop waiting for published messages. We also make sure to delete the user from the hash store since that user is no longer connected to the board. Finally, we publish a message to the channel to communicate with other clients that a user has been disconnected. This ensures our frontend clients maintain an up to date list of connected users.

We can call c.closeSubscriptions in the defer function of our readPump goroutine to gracefully close down our subscriptions.

func (c *Client) readPump() {
defer func() {
c.closeSubscriptions()
c.conn.Close()
}()
c.conn.SetReadLimit(maxMessageSize)
if err := c.conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
log.Printf("Failed to set read deadline: %v", err)
}
c.conn.SetPongHandler(func(string) error {
if err := c.conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
log.Printf("Failed to set read deadline to new time: %v", err)
}
return nil
})
for {
_, msg, err := c.conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
}
break
}
handleMessage(c, msg)
}
}

To ensure everything is working properly, let’s spin up a separate backend-core instance by using the Copy docker run command. This command copies all of our env vars and makes sure the backend points to the same DB and Redis servers. The only config we need to change is the local port that is being exposed, since 8080 is already allocated. Once we’ve made that edit, we can initialize the backend server and test our implementation.

I am using Postman to send WebSocket requests and testing whether or not a different user on a different backend process can access the same board and perform CRUD operations.

New process running on port 8082

Notice the WebSocket server is running on port 8082 and is able to successfully connect and create a post. On the frontend, we see that as Jane Smith being online and her post is in the top left corner!

--

--