Simple implementation of HeartBeat in GoLang

Sai Teja
5 min readFeb 2, 2023

--

Context:-

Have you ever wondered how load balancer tracks available nodes and diverts the load 🤔.it’s quite simple just maintain a list of available nodes 😁 but you might have questions like…. what if node goes down and recovers later 🤔 , how load balancer knows this node is alive etc..

Solution:-

we can simply use HeartBeat and tell server/ Load Balancer that you are alive and you are ready to serve the traffic. It is just like a how our heart Beats ❤️ and signals that you are alive 😁.

HeartBeat :-
At regular intervals you have to inform the server saying that you are ALIVE

func (n *Node) SayYouAreAlive(ctx context.Context) {
defer n.wg.Done()

// push signal randomly
delay := []time.Duration{time.Second * 4, time.Second * 6, time.Second * 8, time.Second * 11}
for {
select {
case <-ctx.Done():
return
case <-time.After(delay[rand.Intn(len(delay))]):
n.ServerCommunicationChan <- n.Name
}
}
}

Node Keeper:-
it will update the latest heart beat time stamp and keeps track of available nodes.( nodes may send heart beat after recovery in that case it will add node again into current list )

// Keeper updates the latest heart beat from nodes
func (s *Server) Keeper(ctx context.Context) {
defer s.wg.Done()
for {
select {
case <-ctx.Done():
return
case x := <-s.ClientCommunicationChan:
_, ok := s.clients[x]
if !ok {
fmt.Println(“node recovered “, x, “at”, time.Now())
s.clients[x] = Client{HeartBeat: HearBeat{Name: x, LastHeartBeatTime: time.Now()}}
} else {
fmt.Println(“got update from node “, x, “at”, time.Now())
s.clients[x] = Client{HeartBeat: HearBeat{Name: x, LastHeartBeatTime: time.Now()}}
}

}
}
}

Remove Unavailable Nodes:-
it keep on check the node latest heartbeat if timestamp difference is greater than given config then it will treat it as a dead node and delete it from current list.

func (s *Server) CheckForUnavailableClients(ctx context.Context) {
defer s.wg.Done()
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Second * 2):
for _, v := range s.clients {
diff := time.Now().Sub(v.HeartBeat.LastHeartBeatTime)
if diff > (time.Second * 5) {
fmt.Println(“node “, v.HeartBeat.Name, “ is unavailable | last available at “, v.HeartBeat.LastHeartBeatTime)
delete(s.clients, v.HeartBeat.Name)
}
}

}
}
}

I have written the complete code below with comments.Please go thru it and let me know if any mistakes/suggestions/doubts etc…👍

package main

import (
"context"
"fmt"
"math/rand"
"strings"
"sync"
"time"
)

const (
bufferSize = 10
)

type Client struct {
HeartBeat HearBeat
}

type HearBeat struct {
Name string
LastHeartBeatTime time.Time
}

type Server struct {
RegisterCli chan string
clients map[string]Client
// to communicate with nodes
ClientCommunicationChan chan string
wg *sync.WaitGroup
}

type Node struct {
Name string
// to communicate with server
ServerCommunicationChan chan string
wg *sync.WaitGroup
}

func main() {
// ctx with timeout for server
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
defer cancel()

// registration channel to register nodes/clients
reg := make(chan string, bufferSize)

// initialize new server instance
svc := NewServer(ctx, reg)

//new ctx with timeout for node
ctxCLiOne, ctxCLiOneCancel := context.WithTimeout(context.Background(), time.Second*24)
defer ctxCLiOneCancel()

// register a node
n1 := NewClient(ctxCLiOne, svc)

n1.wg.Add(1)

// push heart beat signal to server
go n1.SayYouAreAlive(ctxCLiOne)

go func() {
n1.wg.Wait()
}()

ctxCLiTwo, ctxCLiTwoCancel := context.WithTimeout(context.Background(), time.Second*24)
defer ctxCLiTwoCancel()

// register a node
n2 := NewClient(ctxCLiTwo, svc)

n2.wg.Add(1)

// push heart beat signal to server
go n2.SayYouAreAlive(ctxCLiTwo)

go func() {
n2.wg.Wait()
}()

// to hold the main function
time.Sleep(time.Second * 30)

}

func NewClient(ctx context.Context, svc *Server) *Node {
var wg sync.WaitGroup
nodeName := "node-" + randName()
n := &Node{
ServerCommunicationChan: svc.ClientCommunicationChan,
wg: &wg,
Name: nodeName,
}
fmt.Println("new node is generating ", nodeName, "at ", time.Now())

// registers new node in server
svc.RegisterCli <- nodeName

return n
}
func NewServer(ctx context.Context, reg chan string) *Server {
var wg sync.WaitGroup
wg.Add(4)
go func() {
wg.Wait()
}()

svc := &Server{
ClientCommunicationChan: make(chan string, bufferSize),
clients: make(map[string]Client, bufferSize),
wg: &wg,
RegisterCli: make(chan string, bufferSize),
}
go svc.RegisterNode(ctx, reg)
go svc.CheckForUnavailableClients(ctx)
go svc.Keeper(ctx)
go svc.UpdateHeartBeat(ctx)
return svc
}

// RegisterNode Registers new node
func (s *Server) RegisterNode(ctx context.Context, reg chan string) {
defer s.wg.Done()
for {
select {
case <-ctx.Done():
return
case x := <-reg:
cli := HearBeat{Name: x, LastHeartBeatTime: time.Now()}
fmt.Println("new node registered ", x, "at", time.Now())
s.clients[x] = Client{HeartBeat: cli}
}
}
}

func (n *Node) SayYouAreAlive(ctx context.Context) {
defer n.wg.Done()

// push signal randomly
delay := []time.Duration{time.Second * 4, time.Second * 6, time.Second * 8, time.Second * 11}
for {
select {
case <-ctx.Done():
return
case <-time.After(delay[rand.Intn(len(delay))]):
fmt.Println("sending heart beat ", n.Name, "at", time.Now())
n.ServerCommunicationChan <- n.Name
}
}
}

// UpdateHeartBeat acts as a bridge between server and node and informs server that node is alive
func (s *Server) UpdateHeartBeat(ctx context.Context) {
defer s.wg.Done()
for {
select {
case <-ctx.Done():
return
case x := <-s.ClientCommunicationChan:
fmt.Println("updating heart beat ", x, "at", time.Now())
s.ClientCommunicationChan <- x

}
}
}

// Keeper updates the latest heart beat from nodes
func (s *Server) Keeper(ctx context.Context) {
defer s.wg.Done()
for {
select {
case <-ctx.Done():
return
case x := <-s.ClientCommunicationChan:
_, ok := s.clients[x]
if !ok {
fmt.Println("node recovered ", x, "at", time.Now())
s.clients[x] = Client{HeartBeat: HearBeat{Name: x, LastHeartBeatTime: time.Now()}}
} else {
fmt.Println("got update from node ", x, "at", time.Now())
s.clients[x] = Client{HeartBeat: HearBeat{Name: x, LastHeartBeatTime: time.Now()}}
}

}
}
}

// CheckForUnavailableClients removes the unavailable nodes
func (s *Server) CheckForUnavailableClients(ctx context.Context) {
defer s.wg.Done()
for {
select {
case <-ctx.Done():
return
case <-time.After(time.Second * 2):
for _, v := range s.clients {
diff := time.Now().Sub(v.HeartBeat.LastHeartBeatTime)
if diff > (time.Second * 5) {
fmt.Println("node ", v.HeartBeat.Name, " is unavailable | last available at ", v.HeartBeat.LastHeartBeatTime)
delete(s.clients, v.HeartBeat.Name)
}
}

}
}
}
func randName() string {
rand.Seed(time.Now().UnixNano())
smallLetters := "abcdefghijklmnopqrstuvwxyz"
CapLetters := strings.ToUpper(smallLetters)
digits := "1234567890"

return fmt.Sprintf("%s%s%s%s%s", string(smallLetters[rand.Intn(26)]), string(CapLetters[rand.Intn(26)]),
string(digits[rand.Intn(10)]), string(smallLetters[rand.Intn(26)]), string(CapLetters[rand.Intn(26)]))

}



sample output:-
new node is generating node-sF0kD at 2023-02-02 20:10:20.695799 +0530 IST m=+0.000341921
new node is generating node-nC7wC at 2023-02-02 20:10:20.696029 +0530 IST m=+0.000572149
sending heart beat node-sF0kD at 2023-02-02 20:10:31.696547 +0530 IST m=+11.000759940
node recovered node-sF0kD at 2023-02-02 20:10:31.696647 +0530 IST m=+11.000860512
sending heart beat node-nC7wC at 2023-02-02 20:10:31.696492 +0530 IST m=+11.000705042
updating heart beat node-nC7wC at 2023-02-02 20:10:31.696713 +0530 IST m=+11.000926418
node recovered node-nC7wC at 2023-02-02 20:10:31.696752 +0530 IST m=+11.000965150
node node-sF0kD is unavailable | last available at 2023-02-02 20:10:31.69666 +0530 IST m=+11.000872839
node node-nC7wC is unavailable | last available at 2023-02-02 20:10:31.696761 +0530 IST m=+11.000974564
sending heart beat node-sF0kD at 2023-02-02 20:10:37.696879 +0530 IST m=+17.000912439
updating heart beat node-sF0kD at 2023-02-02 20:10:37.696922 +0530 IST m=+17.000955570
node recovered node-sF0kD at 2023-02-02 20:10:37.696931 +0530 IST m=+17.000964439
sending heart beat node-sF0kD at 2023-02-02 20:10:41.69816 +0530 IST m=+21.002073412
updating heart beat node-sF0kD at 2023-02-02 20:10:41.698296 +0530 IST m=+21.002208862
got update from node node-sF0kD at 2023-02-02 20:10:41.698328 +0530 IST m=+21.002241449
sending heart beat node-nC7wC at 2023-02-02 20:10:42.697381 +0530 IST m=+22.001264105
updating heart beat node-nC7wC at 2023-02-02 20:10:42.697432 +0530 IST m=+22.001315581
node recovered node-nC7wC at 2023-02-02 20:10:42.697469 +0530 IST m=+22.001353523
node node-sF0kD is unavailable | last available at 2023-02-02 20:10:41.698344 +0530 IST m=+21.002256878
node node-nC7wC is unavailable | last available at 2023-02-02 20:10:42.697478 +0530 IST m=+22.001360697

--

--