Using Coherence Queues with Go

Tim Middleton
Oracle Coherence
Published in
7 min readMay 21, 2024

Overview

Coherence Community Edition (CE) 24.03 introduced Queues as a data structure, which are useful for inter-service communicating between microservices. Queues can provide benefits such as service decoupling, buffering, rate limiting, data integration as well as load balancing of processing to name a few.

While the Queue API is primarily in Java, in this article we will show how to integrate with Queues via the Coherence Go Client. In a further article we will go into more detail about Queues in Java.

Demo Architecture

The operations on the Queue API we will be using in this example are:

  • Offer() — inserts the specified value to the end of the queue if it is possible to do so
  • Poll() — retrieves and removes the head of this queue. The API allows for blocking and non-blocking queues

You can also Peek() at the head of the queue without removing an entry.

Note: The complete project is available here if you wish to clone it.

Run the Example

This example uses Coherence CE Docker image (for ease of use) which exposes a gRPC port for the Go client to connect to. In a real-life scenario you would have multiple Coherence storage members running, over which you can load balance the gRPC connections and provide data reliability and scalability.

Pre-requisites

To run the example, you will need:

  1. A Docker environment such as Docker or Rancher Desktop
  2. Go version 1.19+ installed — https://go.dev/doc/install

Setup your environment

Create a new directory and change to that directory. Create the demo structure and initialize Golang.

mkdir common publisher consumer
go mod init queue-demo

Start the Coherence Cluster

The following command will create a single node Coherence cluster and expose gRPC on port 1408 as well as Management on port 30000.

docker run -d -p 1408:1408 -p 30000:30000 ghcr.io/oracle/coherence-ce:24.03

Create the common data structure

Using your text editor of choice create the following file: common/common.go and add the following contents. This defines the queue name and the Order structure our Queue will use and is used by the publisher and consumer.

package common

const QueueName = "orders-queue"

// Order represents a fictitious order.
type Order struct {
OrderNumber int `json:"orderNumber"`
Customer string `json:"customer"`
OrderStatus string `json:"orderStatus"`
OrderTotal float32 `json:"orderTotal"`
CreateTime int64 `json:"createTime"`
CompleteTime int64 `json:"completeTime"`
}

Note: Code for common.go is available here.

Create a publisher process

Next create a Go process that will publish Orders to the Queue. Edit the file: publisher/main.go and add the contents below:

package main

import (
"context"
"fmt"
"github.com/oracle/coherence-go-client/coherence"
"log"
"math/rand"
"os"
"queue-demo/common"
"strconv"
"time"
)

func main() {
var (
ctx = context.Background()
startOrderNumber int
numOrders int
err error
)

// check arguments
if len(os.Args) != 3 {
log.Println("provide starting order number and number to complete")
return
}

if startOrderNumber, err = strconv.Atoi(os.Args[1]); err != nil || startOrderNumber < 0 {
log.Println("invalid value for number of orders")
return
}

if numOrders, err = strconv.Atoi(os.Args[2]); err != nil || numOrders < 0 {
log.Println("invalid value for starting order")
return
}

// create a new Session to the default gRPC port of 1408 using plain text
session, err := coherence.NewSession(ctx, coherence.WithPlainText())
if err != nil {
panic(err)
}
defer session.Close()

orderQueue, err := coherence.GetNamedQueue[common.Order](ctx, session, common.QueueName)
if err != nil {
panic(err)
}

defer orderQueue.Close()

for i := 0; i < numOrders; i++ {
newOrder := common.Order{
OrderNumber: startOrderNumber + i,
Customer: fmt.Sprintf("Customer %d", i + 1),
OrderStatus: "NEW",
OrderTotal: rand.Float32() * 1000, //nolint
CreateTime: time.Now().UnixMilli(),
}
err = orderQueue.Offer(newOrder)

if i%25 == 0 && i != 0 {
log.Printf("submitted %d orders so far", i)
}

if err != nil {
panic(err)
}
}

log.Printf("Submitted %d orders", numOrders)
}

Note: Code for publisher.go is available here.

The publishes takes two arguments, a start order number and number of orders, connects to Coherence using default port of 1408 and gets the queue. Then calls Offer() to add the specified number of Orders to the Queue.

Create a consumer process

Next create a Go process that will dequeueOrders from the Queue. In this example we don’t do anything with the orders, but you can image we could do some processing. Edit the file: consumer/main.go and add the contents below:

package main

import (
"context"
"fmt"
"github.com/oracle/coherence-go-client/coherence"
"log"
"queue-demo/common"
"time"
)

func main() {
var (
ctx = context.Background()
order *common.Order
err error
received int64
)

// create a new Session to the default gRPC port of 1408 using plain text
session, err := coherence.NewSession(ctx, coherence.WithPlainText())
if err != nil {
panic(err)
}
defer session.Close()

blockingQueue, err := coherence.GetBlockingNamedQueue[common.Order](ctx, session, common.QueueName)
if err != nil {
panic(err)
}

defer blockingQueue.Close()

timeout := time.Duration(5) * time.Second
log.Println("Waiting for orders")
for {
order, err = blockingQueue.Poll(timeout)
if err == coherence.ErrQueueTimedOut {
fmt.Printf("timed out waiting for message after %v\n", timeout)
continue
}
if err != nil {
panic(err)
}

order.CompleteTime = time.Now().UnixMilli()
processingTime := time.UnixMilli(order.CompleteTime).Sub(time.UnixMilli(order.CreateTime))

received++

fmt.Printf("Order=%d (%s) created on %v, processing time=%v orders received=%d\n",
order.OrderNumber, order.Customer, time.UnixMilli(order.CreateTime), processingTime, received) }
}

Note: Code for consumer.go is available here.

The consumer uses a blocking Queue API to dequeue Orders from the queue. It waits up till the timeout (this case 5s) to retrieve an Order from the head of the queue.

If during that time a new Order comes on the queue it will immediately get notified of this and attempt to dequeue the message, otherwise it displays a message and tried to dequeue the next Order. It then calculates the time it took from Order creating to (simulate) processing.

Build the Go Binaries

You can run go programs directly using go run but we will build the publisher and consumer binaries using the following:

cd publisher
go get github.com/oracle/coherence-go-client@latest
go mod tidy
go build -o publisher .
cd ../consumer
go build -o consumer .

Issue the following to show the binaries in each directory:

ls -l publisher/ consumer/
consumer/:
total 30952
-rwxr-xr-x 1 user staff 15839344 20 May 15:11 consumer
-rw-r--r-- 1 user staff 1232 20 May 11:09 main.go

publisher/:
total 30912
-rw-r--r-- 1 user staff 1544 20 May 15:05 main.go
-rwxr-xr-x 1 user staff 15820032 20 May 15:05 publisher

Run the Consumer Process

Open a new terminal, change to the consumer directory and run the following:

./consumer 
2024/05/20 11:22:23 session: 546a78c3-2e85-40c7-88bd-0aa9fdfd9550 connected to address localhost:1408
2024/05/20 11:22:24 Waiting for orders
timed out waiting for message after 5s

Run a Publisher

Open another terminal, change to the publisher directory and run the following to add 100 orders starting at order 1.

./publisher 1 100
2024/05/20 11:24:14 session: 8bf7ec9e-2eae-4d8c-a08e-b632d17ab21d connected to address localhost:1408
2024/05/20 11:24:15 submitted 25 orders so far
2024/05/20 11:24:15 submitted 50 orders so far
2024/05/20 11:24:15 submitted 75 orders so far
2024/05/20 11:24:16 Submitted 100 orders
2024/05/20 11:24:16 closed session 8bf7ec9e-2eae-4d8c-a08e-b632d17ab21d

You will see the orders being received on the consumer terminal. They are received in order as queues are always first-in first-out or FIFO and we only have one consumer.

Order=1 (Customer 1) created on 1716188761271, processing time=10ms orders received=1
...
Order=99 (Customer 99) created on 1716188762186, processing time=158ms orders received=99
Order=100 (Customer 100) created on 1716188762193, processing time=156ms orders received=100

Add a second Consumer

We can process more orders in a given time, by increasing the number of consumer processes. Each of the processes will try to dequeue the next message from the queue. A single Order will only ever be dequeued by a Poll() by one of the consumers as that is the Queue API contract.

Open another terminal and start the second consumer process. Once its started, re-run the publisher to publish another 100 orders.

The overall time for the processing of the orders actually reduces as we now have two consumers processing in parallel. You can see that the simulated processing time of the Orders is around 90ms as compared to around 156ms with only one consumer.

Note: Performance of the Queue operations are much faster when we are not using Docker and are using Coherence natively.

# first consumer
...
Order=97 (Customer 97) created on 1716188812727, processing time=95ms orders received=149
Order=99 (Customer 99) created on 1716188812746, processing time=83ms orders received=150Conclusion

Tips

Try some of the following:

  1. Restart the 2 consumers and send 10,000 Orders. At the the end of the processing the number of orders each consumer as received will always add up to 10,000.
  2. Start additional publishers to add more messages to the queue in parallel.

Conclusion

In this article, we demonstrated how to integrate Coherence Queues with Go using Coherence CE by creating publishers and consumers in Go.

By running the example, we showed how Coherence Queues can:

  • Decouple services
  • Buffer messages
  • Balance work loads

This guide provides a foundation for utilizing Coherence Queues to enhance the scalability and efficiency of your systems.

More Information

--

--