Pub Sub model in Go Lang

Sai Teja
2 min readNov 28, 2022

--

Basic pub-sub model in GO

i Know people look for code 😋 instead of explanation

Please go thru the below code with comments and let me know if any mistakes/suggestions etc…👍

package main

import (
"context"
"fmt"
"math/rand"
"time"
)

// global subscribers list
var Subscribers map[string]chan interface{}

// init empty subscribers
func init() {
Subscribers = make(map[string]chan interface{})
}

type SubscriberClient struct {
Name string
Ch chan interface{}
}

func main() {

// register clients to subscribe
subscriberOne := RegisterClient("one")
subscriberTwo := RegisterClient("two")

ctx := context.Background()

// initiate subscribers

// set timeout to exit the subscriber 1
ctx, cancelFunc:= context.WithTimeout(ctx, time.Second*12)

// it will send the signal to subscriber to exit
defer cancelFunc()
go subscriberOne.sub(ctx)

// set timeout to exit the subscriber 2
ctx,cancelFunc2 := context.WithTimeout(ctx, time.Second*8)
defer cancelFunc2()
go subscriberTwo.sub(ctx)

// initiate publisher

// set timeout to exit the publisher
ctx, cancelFunc3 := context.WithTimeout(ctx, time.Second*6)
defer cancelFunc3()
go pub(ctx)

// i am just keeping ticker to send events only for 6 seconds after that it will exit the main
// else based on use case publisher can handle this . if you are implementing this in
// API's then no need for ticker because publisher will run until program exists
select {
case <-time.Tick(time.Second * 13):
fmt.Println("exiting main")
break
}
}

// it will return subscriber client with name and channel to read
func RegisterClient(name string) *SubscriberClient {
// for async we use buffered channels
// for sync we use unbuffered channels
ch := make(chan interface{}, 4)

// store it channel in our global producer list
Subscribers[name] = ch

// return the channel to subscriber so they can read from it
return &SubscriberClient{Name: name, Ch: ch}
}

// publisher
func pub(ctx context.Context) {
for {
select {
//when to stop condition (publisher need to handle this ...)
case <-ctx.Done():
fmt.Println("exiting publisher")
return

// it will publish messages every 2 seconds
// other way we can keep constraints when to publish
case <-time.Tick(time.Second * 2):
publishToAllConsumers(rand.Int())
}
}
}

// subscriber
func (s *SubscriberClient) sub(ctx context.Context) {
for {
select {
//when to stop condition (subscriber need to handle this ...)
case <-ctx.Done():
fmt.Println( "exiting subscriber ",s.Name)
return
case x := <-s.Ch:
fmt.Println(x, " value from subscriber ", s.Name)
}
}
}

// it will publish all registered subscribers (implemented TC:-O(N)// we can enhance it further....)
func publishToAllConsumers(data interface{}) {
for k, v := range Subscribers {
fmt.Println("sending data to ", k)
v <- data
}
}

sample output:-

sending data to one
sending data to two
5577006791947779410 value from subscriber two
5577006791947779410 value from subscriber one
sending data to one
sending data to two
8674665223082153551 value from subscriber one
8674665223082153551 value from subscriber two
exiting publisher
exiting subscriber two
exiting subscriber one
exiting main

--

--