Pub/Sub in Golang: An Introduction
Today, we will dive into the world of pub/sub in Golang. Pub/sub, short for “publish-subscribe,” is a messaging pattern that decouples systems and communications between them. In this blog, we will provide a detailed example of pub/sub implemented in the Go programming language, along with an explanation of the code. We hope this will provide a solid foundation for anyone looking to implement pub/sub in their own Go projects. So, let’s get started!
As indicated in the name, a pub/sub system has two types of entities: Publishers and Subscribers. Publishers are entities that produce messages and publish them to a specific topic, whereas Subscribers are entities that receive messages by subscribing to a specific topic. The pub/sub pattern is often used to decouple different parts of a system in a cloud system. For example, a message queue or message bus can act as a pub/sub system, allowing different parts of a system to communicate without being directly connected to each other. This makes it easier to manage complex systems and improve scalability and reliability.
In a pub/sub system, publishers do not send messages directly to subscribers. Instead, the messages are routed through a broker, which is a middleman responsible for receiving messages from publishers and forwarding them to the appropriate subscribers. This allows subscribers to receive messages without needing to know the identities of the publishers and vice versa.
Coming to Golang, this pub/sub pattern can be implemented using channels and goroutines. As we know, channels are a way in which goroutines communicate, allowing them to send and receive values. In the pub/sub pattern, one goroutine (AKA publisher) sends values to the channel, and one or more goroutines (AKA subscribers) receive those values from the channel.
Let’s take a basic example of how the pub/sub pattern might be implemented in Golang:
// channel to publish messages to
msgChannel := make(chan string)
// function to publish messages to the channel
func publishingMessage(message string) {
msgChannel <- message
}
// function to receive messages from the channel
func receivingMessage() {
for {
msg := <-msgChannel
fmt.Println("Received message:", msg)
}
}
// goroutine to publish messages
go publishingMessage("Hello from Globant")
// goroutine to receive messages
go receivingMessage()
In the above example, the publishingMessage
function publishes messages to the msgChannel
channel, and the receivingMessage
function receives messages from the same channel. The publishingMessage
function is run as a separate goroutine, allowing it to run concurrently with the receivingMessage
function, which is also run as a separate goroutine. This allows the two functions to communicate with each other using the msgChannel
channel, implementing the pub/sub pattern.
Let’s take an even more detailed example. Consider a pub/sub agent that allows publishers to publish messages on specific topics and subscribers to subscribe to and receive those messages.
I will start by defining an Agent struct that implements a simple pub/sub agent in Golang. The Agent
struct has a mu
field of type sync.Mutex
, a subs
field of type map[string][]chan string
, a quit
field of type chan struct{}
, and a closed
field of type bool
. The mu
field is used to protect access to the subs
and closed
fields using a mutex, a synchronization mechanism that allows only one goroutine to access these fields at a time. The subs
field is a map that holds a list of channels for each topic, allowing subscribers to receive messages published to that topic. The quit
field is a channel that is closed when the agent is closed, allowing goroutines that are blocked on the channel to unblock and exit. The closed
field is a flag that indicates whether the agent has been closed.
type Agent struct {
mu sync.Mutex
subs map[string][]chan string
quit chan struct{}
closed bool
}
Now let’s create a NewAgent
function that creates and returns a new Agent
struct. This function initializes the subs
field as a new empty map and the quit
field as a new channel. The Publish
method allows a message to be published to a topic. This method acquires a lock on the mu
mutex, and then sends the message to all channels in the subs
map for the specified topic. The Subscribe
method allows a goroutine to subscribe to a topic. This method acquires a lock on the mu
mutex, then creates a new channel and adds it to the list of channels for the specified topic in the subs
map. This method returns the newly-created channel, allowing the subscriber to receive messages published to the topic.
func NewAgent() *Agent {
return &Agent{
subs: make(map[string][]chan string),
quit: make(chan struct{}),
}
}
Now thePublish
method takes a topic and a message as arguments and sends the message to all subscribers of that topic. It acquires a lock on the agent’s mutex (mu
) before modifying the subscriptions map to ensure that it is thread-safe.
func (b *Agent) Publish(topic string, msg string) {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return
}
for _, ch := range b.subs[topic] {
ch <- msg
}
}
The Subscribe
method takes a topic as an argument and returns a channel on which the subscriber can receive messages for that topic. It also acquires a lock on the agent’s mutex before modifying the subscriptions map.
func (b *Agent) Subscribe(topic string) <-chan string {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return nil
}
ch := make(chan string)
b.subs[topic] = append(b.subs[topic], ch)
return ch
}
The Close
method closes the agent and all channels in the subs
map. This method acquires a lock on the mu
mutex, sets the closed
flag to true
, closes the quit
channel, and then closes all channels in the subs map. This allows goroutines that are blocked on these channels to unblock and exit.
func (b *Agent) Close() {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return
}
b.closed = true
close(b.quit)
for _, ch := range b.subs {
for _, sub := range ch {
close(sub)
}
}
}
The main
function creates a new agent, subscribes to the “foo” topic, publishes a message to the “foo” topic, receives and prints the message, and then closes the agent.
func main() {
// Create a new agent
agent := NewAgent()
// Subscribe to a topic
sub := agent.Subscribe("foo")
// Publish a message to the topic
go agent.Publish("foo", "hello world")
// Print the message
fmt.Println(<-sub)
// Close the agent
agent.Close()
}
You can access the entire code from the Github repo.
Conclusion
The pub/sub pattern is useful in Golang because it allows different parts of an application to communicate and interact with each other without needing to be tightly coupled. This can make it easier to write modular, scalable, and maintainable code.
In conclusion, the pub/sub pattern is valuable in the Golang developer’s toolkit and can be easily implemented using channels and goroutines. It can help create flexible, scalable, and maintainable applications, making it a powerful and useful pattern for Golang development.
References: