Working with Google Pub/Sub in Go: Example Code for Creating Topics and Consuming Messages in JSON Format
I have had the opportunity to work with Google Pub/Sub on occasion.
In Go, you can use the Google Cloud Pub/Sub API to create a topic if it does not already exist, and then subscribe to the existing topic “topic A” to consume messages in a simple JSON format. To achieve this, first, create a Pub/Sub client and check if the specified topic exists. If it does not exist, create it using the CreateTopic
method. Then, create a subscription to the topic "topic A" using the CreateSubscription
method. If the subscription does not exist, it will be created, and the messages can be consumed by calling the Receive
method on the subscription. Inside the callback function, the message data can be unmarshalled into a struct, and the data can be printed to the console. Finally, the message can be acknowledged using the Ack
method.
Here is an example code snippet in Go that creates a topic if it does not already exist, and then subscribes to the topic “topic A” and consumes the messages in a simple JSON format using Google Cloud Pub/Sub API:
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"cloud.google.com/go/pubsub"
)
const (
projectID = "your-project-id"
topicID = "your-topic-id"
subName = "your-subscription-name"
)
type Message struct {
Data string `json:"data"`
}
func main() {
ctx := context.Background()
// Create a Pub/Sub client
client, err := pubsub.NewClient(ctx, projectID)
if err != nil {
log.Fatalf("Failed to create client: %v", err)
}
// Create a topic if it doesn't already exist
topic := client.Topic(topicID)
ok, err := topic.Exists(ctx)
if err != nil {
log.Fatalf("Failed to check if topic exists: %v", err)
}
if !ok {
if _, err := client.CreateTopic(ctx, topicID); err != nil {
log.Fatalf("Failed to create topic: %v", err)
}
log.Printf("Topic %s created.\n", topicID)
}
// Create a subscription to the topic "topic A"
sub := client.Subscription(subName)
ok, err = sub.Exists(ctx)
if err != nil {
log.Fatalf("Failed to check if subscription exists: %v", err)
}
if !ok {
if _, err := client.CreateSubscription(ctx, subName, pubsub.SubscriptionConfig{
Topic: topic,
}); err != nil {
log.Fatalf("Failed to create subscription: %v", err)
}
log.Printf("Subscription %s created.\n", subName)
}
// Start consuming messages from the subscription
err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
// Unmarshal the message data into a struct
var m Message
if err := json.Unmarshal(msg.Data, &m); err != nil {
log.Printf("Failed to unmarshal message data: %v", err)
msg.Nack()
return
}
// Print the message data
fmt.Printf("Received message: %s\n", m.Data)
// Acknowledge the message
msg.Ack()
})
if err != nil {
log.Fatalf("Failed to receive messages: %v", err)
}
// Gracefully shutdown the Pub/Sub client
if err := client.Close(); err != nil {
log.Fatalf("Failed to close client: %v", err)
}
}
In this example, the code first creates a Pub/Sub client and checks if the topic specified by topicID
exists. If the topic does not exist, it is created using the CreateTopic
method.
Next, a subscription to the topic “topic A” is created using the CreateSubscription
method. The subscription name is specified by subName
, and the Topic
field in SubscriptionConfig
is set to the topic created earlier.
Finally, the code calls Receive
on the subscription to start consuming messages. The Receive
method takes a context and a callback function that is called for each message received. Inside the callback function, the message data is unmarshalled into a Message
struct, and the data is printed to the console. The message is acknowledged using the Ack
method, which marks the message as processed and removes it from the subscription queue.
If you find this information useful, please consider supporting and following me for further updates.🙂