Working with Google Pub/Sub in Go: Example Code for Creating Topics and Consuming Messages in JSON Format

Brian
3 min readMar 24, 2023

--

I have had the opportunity to work with Google Pub/Sub on occasion.

In Go, you can use the Google Cloud Pub/Sub API to create a topic if it does not already exist, and then subscribe to the existing topic “topic A” to consume messages in a simple JSON format. To achieve this, first, create a Pub/Sub client and check if the specified topic exists. If it does not exist, create it using the CreateTopic method. Then, create a subscription to the topic "topic A" using the CreateSubscription method. If the subscription does not exist, it will be created, and the messages can be consumed by calling the Receive method on the subscription. Inside the callback function, the message data can be unmarshalled into a struct, and the data can be printed to the console. Finally, the message can be acknowledged using the Ack method.

Here is an example code snippet in Go that creates a topic if it does not already exist, and then subscribes to the topic “topic A” and consumes the messages in a simple JSON format using Google Cloud Pub/Sub API:

package main

import (
"context"
"encoding/json"
"fmt"
"log"
"os"

"cloud.google.com/go/pubsub"
)

const (
projectID = "your-project-id"
topicID = "your-topic-id"
subName = "your-subscription-name"
)

type Message struct {
Data string `json:"data"`
}

func main() {
ctx := context.Background()

// Create a Pub/Sub client
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}

// Create a topic if it doesn't already exist
topic := client.Topic(topicID)
ok, err := topic.Exists(ctx)
if err != nil {
log.Fatalf("Failed to check if topic exists: %v", err)
}
if !ok {
if _, err := client.CreateTopic(ctx, topicID); err != nil {
log.Fatalf("Failed to create topic: %v", err)
}
log.Printf("Topic %s created.\n", topicID)
}

// Create a subscription to the topic "topic A"
sub := client.Subscription(subName)
ok, err = sub.Exists(ctx)
if err != nil {
log.Fatalf("Failed to check if subscription exists: %v", err)
}
if !ok {
if _, err := client.CreateSubscription(ctx, subName, pubsub.SubscriptionConfig{
Topic: topic,
}); err != nil {
log.Fatalf("Failed to create subscription: %v", err)
}
log.Printf("Subscription %s created.\n", subName)
}

// Start consuming messages from the subscription
err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
// Unmarshal the message data into a struct
var m Message
if err := json.Unmarshal(msg.Data, &m); err != nil {
log.Printf("Failed to unmarshal message data: %v", err)
msg.Nack()
return
}

// Print the message data
fmt.Printf("Received message: %s\n", m.Data)

// Acknowledge the message
msg.Ack()
})
if err != nil {
log.Fatalf("Failed to receive messages: %v", err)
}

// Gracefully shutdown the Pub/Sub client
if err := client.Close(); err != nil {
log.Fatalf("Failed to close client: %v", err)
}
}

In this example, the code first creates a Pub/Sub client and checks if the topic specified by topicID exists. If the topic does not exist, it is created using the CreateTopic method.

Next, a subscription to the topic “topic A” is created using the CreateSubscription method. The subscription name is specified by subName, and the Topic field in SubscriptionConfig is set to the topic created earlier.

Finally, the code calls Receive on the subscription to start consuming messages. The Receive method takes a context and a callback function that is called for each message received. Inside the callback function, the message data is unmarshalled into a Message struct, and the data is printed to the console. The message is acknowledged using the Ack method, which marks the message as processed and removes it from the subscription queue.

If you find this information useful, please consider supporting and following me for further updates.🙂

--

--

Brian

Software engineer interested in full stack, Golang, JavaScript, Python, Node.js, React, Nest.js & Next.js. Sharing knowledge through blogs. Follow for updates!