Pub/Sub in Golang: An Introduction

Vaibhav Chichmalkar
Globant
Published in
5 min readJan 27, 2023
Image Credits: https://mailtrap.io/blog/golang-send-email

Today, we will dive into the world of pub/sub in Golang. Pub/sub, short for “publish-subscribe,” is a messaging pattern that decouples systems and communications between them. In this blog, we will provide a detailed example of pub/sub implemented in the Go programming language, along with an explanation of the code. We hope this will provide a solid foundation for anyone looking to implement pub/sub in their own Go projects. So, let’s get started!

As indicated in the name, a pub/sub system has two types of entities: Publishers and Subscribers. Publishers are entities that produce messages and publish them to a specific topic, whereas Subscribers are entities that receive messages by subscribing to a specific topic. The pub/sub pattern is often used to decouple different parts of a system in a cloud system. For example, a message queue or message bus can act as a pub/sub system, allowing different parts of a system to communicate without being directly connected to each other. This makes it easier to manage complex systems and improve scalability and reliability.

In a pub/sub system, publishers do not send messages directly to subscribers. Instead, the messages are routed through a broker, which is a middleman responsible for receiving messages from publishers and forwarding them to the appropriate subscribers. This allows subscribers to receive messages without needing to know the identities of the publishers and vice versa.

Image Credits: https://www.arothuis.nl/posts/messaging-pub-sub/

Coming to Golang, this pub/sub pattern can be implemented using channels and goroutines. As we know, channels are a way in which goroutines communicate, allowing them to send and receive values. In the pub/sub pattern, one goroutine (AKA publisher) sends values to the channel, and one or more goroutines (AKA subscribers) receive those values from the channel.

Let’s take a basic example of how the pub/sub pattern might be implemented in Golang:

// channel to publish messages to
msgChannel := make(chan string)

// function to publish messages to the channel
func publishingMessage(message string) {
msgChannel <- message
}

// function to receive messages from the channel
func receivingMessage() {
for {
msg := <-msgChannel
fmt.Println("Received message:", msg)
}
}

// goroutine to publish messages
go publishingMessage("Hello from Globant")

// goroutine to receive messages
go receivingMessage()

In the above example, the publishingMessage function publishes messages to the msgChannel channel, and the receivingMessage function receives messages from the same channel. The publishingMessage function is run as a separate goroutine, allowing it to run concurrently with the receivingMessage function, which is also run as a separate goroutine. This allows the two functions to communicate with each other using the msgChannel channel, implementing the pub/sub pattern.

Let’s take an even more detailed example. Consider a pub/sub agent that allows publishers to publish messages on specific topics and subscribers to subscribe to and receive those messages.

I will start by defining an Agent struct that implements a simple pub/sub agent in Golang. The Agent struct has a mu field of type sync.Mutex, a subs field of type map[string][]chan string, a quit field of type chan struct{}, and a closed field of type bool. The mu field is used to protect access to the subs and closed fields using a mutex, a synchronization mechanism that allows only one goroutine to access these fields at a time. The subs field is a map that holds a list of channels for each topic, allowing subscribers to receive messages published to that topic. The quit field is a channel that is closed when the agent is closed, allowing goroutines that are blocked on the channel to unblock and exit. The closed field is a flag that indicates whether the agent has been closed.

type Agent struct {
mu sync.Mutex
subs map[string][]chan string
quit chan struct{}
closed bool
}

Now let’s create a NewAgent function that creates and returns a new Agent struct. This function initializes the subs field as a new empty map and the quit field as a new channel. The Publish method allows a message to be published to a topic. This method acquires a lock on the mu mutex, and then sends the message to all channels in the subs map for the specified topic. The Subscribe method allows a goroutine to subscribe to a topic. This method acquires a lock on the mu mutex, then creates a new channel and adds it to the list of channels for the specified topic in the subs map. This method returns the newly-created channel, allowing the subscriber to receive messages published to the topic.

func NewAgent() *Agent {
return &Agent{
subs: make(map[string][]chan string),
quit: make(chan struct{}),
}
}

Now thePublish method takes a topic and a message as arguments and sends the message to all subscribers of that topic. It acquires a lock on the agent’s mutex (mu) before modifying the subscriptions map to ensure that it is thread-safe.

func (b *Agent) Publish(topic string, msg string) {
b.mu.Lock()
defer b.mu.Unlock()

if b.closed {
return
}

for _, ch := range b.subs[topic] {
ch <- msg
}
}

The Subscribe method takes a topic as an argument and returns a channel on which the subscriber can receive messages for that topic. It also acquires a lock on the agent’s mutex before modifying the subscriptions map.

func (b *Agent) Subscribe(topic string) <-chan string {
b.mu.Lock()
defer b.mu.Unlock()

if b.closed {
return nil
}

ch := make(chan string)
b.subs[topic] = append(b.subs[topic], ch)
return ch
}

The Close method closes the agent and all channels in the subs map. This method acquires a lock on the mu mutex, sets the closed flag to true, closes the quit channel, and then closes all channels in the subs map. This allows goroutines that are blocked on these channels to unblock and exit.

func (b *Agent) Close() {
b.mu.Lock()
defer b.mu.Unlock()

if b.closed {
return
}

b.closed = true
close(b.quit)

for _, ch := range b.subs {
for _, sub := range ch {
close(sub)
}
}
}

The main function creates a new agent, subscribes to the “foo” topic, publishes a message to the “foo” topic, receives and prints the message, and then closes the agent.

func main() {
// Create a new agent
agent := NewAgent()

// Subscribe to a topic
sub := agent.Subscribe("foo")

// Publish a message to the topic
go agent.Publish("foo", "hello world")

// Print the message
fmt.Println(<-sub)

// Close the agent
agent.Close()
}

You can access the entire code from the Github repo.

Conclusion

The pub/sub pattern is useful in Golang because it allows different parts of an application to communicate and interact with each other without needing to be tightly coupled. This can make it easier to write modular, scalable, and maintainable code.

In conclusion, the pub/sub pattern is valuable in the Golang developer’s toolkit and can be easily implemented using channels and goroutines. It can help create flexible, scalable, and maintainable applications, making it a powerful and useful pattern for Golang development.

References:

  1. https://blog.logrocket.com/building-pub-sub-service-go/
  2. https://ably.com/blog/pubsub-golang
  3. https://cloud.google.com/pubsub/docs/overview

--

--