Redis and Golang: Making Message Brokers Easy and Powerful!

Abinav Ghimire
readytowork, Inc.
Published in
6 min readMar 1, 2024

In the ever-evolving realm of software architecture, establishing efficient communication channels between components is a foundational necessity. Enter Redis and Golang — a dynamic duo poised to revolutionize the way we approach messaging systems. Redis, renowned for its speed and versatility as an in-memory data store, seamlessly collaborates with Golang, a language celebrated for its simplicity and high performance, offering a compelling solution for crafting resilient and scalable messaging infrastructures.

Go+Redis

In this article, we’ll use Redis, a top message broker, to implement the publish-subscribe pattern in Go applications. This pattern enhances scalability, enables heavy asynchronous tasks across nodes, and supports event-driven architecture, data transformation, and more. We’ll use Docker for easy management and deployment. This article assumes that you have installed Docker and Go in your system and have a Go project already set up.

Setting up docker and config files

The initial step will be to incorporate the go-redis library into our main Dockerfile, we will enrich it by including the command: RUN go get github.com/go-redis/redis. The refined Dockerfile will appear as follows:

FROM golang:1.18-alpine

WORKDIR /redis_docker

# Copy everything from this project into the filesystem of the container.
COPY . .

# Obtain the package needed to run redis commands.
RUN go get github.com/go-redis/redis

RUN go mod tidy

# Compile the binary exe for our app.
RUN go build -o main .
# Start the application.
CMD ["./main"]

Since Redis operates as a standalone service, it’s essential to establish a Redis service within our docker-compose file to seamlessly integrate it into our Docker environment. Within the services section, we will introduce the following key-value pairs to orchestrate both the Redis service and our Go application.

services:
redis:
container_name: "redis"
image: redis:alpine
# Specify the redis.conf file to use
command: redis-server /usr/local/etc/redis/redis.conf
ports:
- "6379:6379"
web:
container_name: "redisapi"
build:
# build the image using the Dockerfile we have in this project. Can use an image instead.
context: .
ports:
- "8080:8080"

Presenting our redis.conf file:

# Required
##########

# Set a memory usage limit to the specified amount of bytes.
# When the memory limit is reached Redis will try to remove keys
# according to the eviction policy selected (see maxmemory-policy).
maxmemory 41943040

Redis Pub-sub services

Redis Pub-Sub, short for Publish-Subscribe, is a messaging pattern where message senders (publishers) distribute messages to multiple recipients (subscribers) without the need for direct communication between them using different channels that will listen to events.

In our Go application, it is imperative to initialize the Redis service for its seamless utilization. Employ the following code snippet to achieve this initialization:

import (
"errors"
"github.com/go-redis/redis"
)

type Redis struct {
RedisClient redis.Client
}

func NewRedis( env Env) Redis {

var client = redis.NewClient(&redis.Options{
// Container name + port since we are using docker
Addr: "redis:6379",
Password: env.RedisPassword,
})

if client == nil {
errors.New("Cannot run redis")
}

return Redis{
RedisClient: *client,
}
}

Building upon the Redis service introduced earlier, we will now construct a publish-subscribe mechanism. To begin, we’ll create a publisher function that accepts a new context, the message to be processed, and the channel/queue name as arguments. This function will be responsible for dispatching events to the specified channel or queue.

// MessagePublisher is a generic publisher for different message types.
type MessagePublisher struct {
redisClient infrastructure.Redis
}

func NewMessagePublisher(redisClient infrastructure.Redis) *MessagePublisher {
return &MessagePublisher{redisClient}
}

// PublishMessages publishes messages to channels.
func (p *MessagePublisher) PublishMessages(ctx context.Context, message interface{}, queueName string) {

serializedMessage, err := json.Marshal(message)
if err != nil {
log.Printf("[%s] Failed to serialize message: %v", queueName, err)
}

// Use the context for the publishing operation
err = p.redisClient.RedisClient.Publish(queueName, serializedMessage).Err()
if err != nil {
log.Printf("[%s] Failed to publish message: %v", queueName, err)
}
}

The above publisher can be used anywhere in your project as follows:

/*
* Create a new context for the redis publisher.
* Need to create new context as if we use our application's current context,
* it will close if application is closed, but we dont want to close our redis pub-sub service.
*/
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Publish the message(context, message, queue Name)
redisPublisher.PublishMessages(ctx, "Test Message", "Test")

Next, let’s establish a subscriber leveraging the Redis service. This adept subscriber listens to events on the “Test” channel, efficiently processing the data sent through the PublishMessages function.

// Consumer is a generic consumer for different message type
type MessageConsumer struct {
redisClient infrastructure.Redis
subscription *redis.PubSub
}

// NewMessageConsumer creates a new instance of MessageConsumer.
func NewMessageConsumer(redis infrastructure.Redis) *MessageConsumer {
return &MessageConsumer{
redisClient: redis,
}
}

// This Function takes queue names in an array and uses a switch statement to perform required logic for the queues
func (c *MessageConsumer) ConsumerMessages(ctx context.Context, queueNames []string) {
for _, queueName := range queueNames {
switch queueName {
case "Test":
// We will handle the go routines in the custom function
go c.handleCustomType1Logic(ctx, queueName)
default:
log.Printf("[%s] Unsupported message type: %+v\n", queueName, queueName)
}
}
}

// handleCustomType1Logic initiates a goroutine to handle messages from the specified queue.
func (c *MessageConsumer) handleCustomType1Logic(ctx context.Context, queueName string) {

// Create a cancellation context to gracefully stop the goroutine
consumerCtx, cancel := context.WithCancel(context.Background())
defer cancel()

log.Printf("[%s] Consumer started listening...\n", queueName)

// Subscribe to the specified Redis channel
c.subscription = c.redisClient.RedisClient.Subscribe(queueName)
defer c.subscription.Close()

// Obtain the channel for receiving messages
channel := c.subscription.Channel()

for {
select {
// Check if the main context is canceled to stop the goroutine
case <-consumerCtx.Done():
log.Printf("[%s] Consumer stopped listening...\n", queueName)
return
// Listen for incoming messages on the channel
case msg := <-channel:
var messageObj interface{}
// Deserialize the message payload
err := json.Unmarshal([]byte(msg.Payload), &messageObj)
if err != nil {
log.Printf("[%s] Failed to deserialize message: %v", queueName, err)
continue
}

// Continue with your logic here:

fmt.Printf("[%s] Received message: %+v\n", queueName, messageObj)
}
}
}

Introducing the MessageConsumer structure, designed to handle messages of various types, it connects to Redis and subscribes to specific queues. The ConsumerMessages method, driven by an array of queue names, executes tailored logic for each queue. For instance, encountering the "Test" queue triggers a concurrent routine to handle custom logic associated with this message type.

The handleCustomType1Logic method subscribes to the "Test" queue, processing incoming messages and seamlessly executing custom logic on the data. Feel free to add your own logic. 😃

This thoughtful structuring of the subscriber, emphasizing clear separation of concerns and effective error handling, establishes a solid groundwork for crafting a scalable and responsive messaging system in our Go application. Also, when creating new logic handlers, be sure to use select statements to prevent goroutine leaks in your application.

Ultimately, we aim for our subscribers to continuously monitor events dispatched to the channels. To make this happen, we’ll set up the channels when our Go application starts.

lifecycle.Append(fx.Hook{
OnStart: func(ctx context.Context) error {

go func() {
// Add your queues here
consumer.ConsumerMessages(ctx, []string{"Test"})

if env.ServerPort == "" {
_ = handler.Gin.Run()
} else {
_ = handler.Gin.Run(":" + env.ServerPort)
}
}()

return nil
},
})

Don’t forget to add new queues to the queue array in the ConsumerMessages method inside OnStart function.

Conclusion

To sum it up, this article explored how Redis and Golang can work together for better messaging. We used Docker to make things easy to set up. By starting things off in our Go application, we made sure it keeps listening for events. The well-organized message handler we created is like a reliable worker, making sure messages are handled smoothly. This shows how Redis and Golang, with their simplicity, can be a great team for solving communication challenges and building strong messaging systems.

--

--