Introducing the Coherence Go Client

Tim Middleton
Oracle Coherence
Published in
7 min readMay 3, 2023

We are excited to announce that a Release Candidate of the new Coherence Go client is now publicly available, bringing the performance, scalability, reliability and availability of the Coherence Data Grid to Go applications!

The coherence-go-client allows native Go applications to act as cache clients to a Coherence cluster using gRPC as the network transport via a Coherence gRPC Proxy Server. The client API fully supports Go generics and is available using Go 1.19 and above.

Overview

The Coherence Go Client supports all of the familiar Coherence “map-like” operations which include:

  • Standard API’sPut, PutWithExpiry, PutIfAbsent, PutAll, Get, GetAll, Remove, Clear, GetOrDefault, Replace, ReplaceMapping, Size, IsEmpty, ContainsKey, ContainsValue andContainsEntry
  • Querying / Aggregation — Cluster-side querying, aggregation and filtering of cache entries via preconfigured aggregators
  • Server-Side Processing — Cluster-side in-place manipulation of cache entires using entry processors
  • Events — Registration of event listeners to be notified of cache mutations such as insert, delete, and update, cache lifecycle events such as truncated, released and destroyed, and session lifecycle events such as connected, disconnected, reconnected and closed
  • Data Serialization — Support for Go primitives and structs and keys and values to be stored by default in JSON on the server, as well as the ability to serialize to Java to support access of data from other Coherence language API’s
  • Generics — Full support for Generics in all Coherence API’s.

Requirements

  • Coherence CE 22.06.4+ and Coherence 14.1.1.2206.4+ Commercial edition with a configured gRPCProxy.
  • Go 1.19.+

Documentation and Examples

If you would like to jump directly to the documentation or the very comprehensive examples see the following, otherwise read-on to learn to to get started.

Getting Started

To add the coherence-go-client to your Go project, issue the following:

go get github.com/oracle/coherence-go-client@latest

Start a Coherence gRPC enabled proxy server. You can use our pre-built Docker image which has everything to get you started.

docker run -d -p 1408:1408 ghcr.io/oracle/coherence-ce:22.06.4

Basic CRUD Example

Below is an example of a sample Go program that carries out the following:

  • Connects to Coherence gRPC proxy on port 1408 using plain text
  • Creates a new NamedMap with key / value types of int and string
    Note: A NamedMap is equivalent to a cache that doesn’t expire. You can use a NamedCache if you want expiry.
  • Uses Put, Get and Remove to manipulate NamedMap entries.

Note: Keys and values can be structs as shown in the example here.

package main

import (
"context"
"fmt"
"github.com/oracle/coherence-go-client/coherence"
)

func main() {
var value *string
ctx := context.Background()

// create a new Session to the default gRPC port of 1408
session, err := coherence.NewSession(ctx, coherence.WithPlainText())
if err != nil {
panic(err)
}
defer session.Close()

// get a new NamedMap with key of int and value of string
namedMap, err := coherence.GetNamedMap[int, string](session, "my-map")
if err != nil {
panic(err)
}

// put a new key / value
if _, err = namedMap.Put(ctx, 1, "one"); err != nil {
panic(err)
}

// get the value for key 1
if value, err = namedMap.Get(ctx, 1); err != nil {
panic(err)
}
fmt.Println("Value for key 1 is", *value)

// update the value for key 1
if _, err = namedMap.Put(ctx, 1, "ONE"); err != nil {
panic(err)
}

// get the updated value for key 1
if value, err = namedMap.Get(ctx, 1); err != nil {
panic(err)
}
fmt.Println("Updated value is", *value)

// remove the entry
if _, err = namedMap.Remove(ctx, 1); err != nil {
panic(err)
}
}

Using Structs for Keys or Values

As mentioned above you can use structs for keys and/or values. In the following example, we have the following Person struct:

type Person struct {
ID int `json:"id"`
Name string `json:"name"`
Age int `json:"age"`
}

We then create a new NamedMap using the type parameters int as key and Person as value:

 namedMap, err := coherence.GetNamedMap[int, Person](session, "people")

We can then use this in our application:

newPerson := Person{ID: 1, Name: "Tim", Age: 21}
if _, err = namedMap.Put(ctx, newPerson.ID, newPerson); err != nil {
panic(err)
}

...

if person, err = namedMap.Get(ctx, 1); err != nil {
panic(err)
}
fmt.Println("Person from Get() is", *person)

By default, data is stored as JSON on the Coherence cluster, but as mentioned before, you can configure the cluster to store as actual Java objects. See the documentation for more information.

Querying Cache Data using Channels

Channels are used to deal with keys, values or entries streamed from the Coherence cluster using filter based or open queries. In the example below, we are querying all people in a NamedMap where the age is greater than 30.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
...
}

// extractor to extract age from the value
age := extractors.Extract[int]("age")

// retrieve all people aged > 30
ch := namedMap.EntrySetFilter(ctx, filters.Greater(age, 20))
for result := range ch {
if result.Err != nil {
log.Fatal(err)
}
fmt.Println("Key:", result.Key, "Value:", result.Value)
}

In the above example the entries are returned in a StreamedEntry which contains a Key, Value and Err. The Err should always be checked and if nil then the entry is valid, otherwise you need to process the error.

In-Place Processing using Entry Processors

Entry processors are a powerful feature of Coherence where instead of retrieving data from the cluster, processing that data and then returning it to the cluster, you send the processing to the cluster and it is carried out in-place where the data resides.

This is extremely efficient as it incorporates a “lock-free” processing model meaning you have an automatic implicit lock on the keys/ values when your entry processors is run to ensure data integrity/ consistency. It also scales well when you have multiple entries to update as well as reduces the amount of traffic required for updates dramatically.

In the following example we have a Person struct where we want to crease and salary of a specific person using the key and also applying a filter.

type Person struct {
Id int `json:"id"`
Name string `json:"name"`
Salary float32 `json:"salary"`
Age int `json:"age"`
City string `json:"city"`
}

...

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")

// 1. Increase the salary of the person with Id = 1
newSalary, err = coherence.Invoke[int, Person, float32](ctx, namedMap, 1,
processors.Multiply("salary", 1.1, true))

city := extractors.Extract[string]("city")

// 2. Increase the salary of all people in Perth
ch2 := coherence.InvokeAllFilter[int, Person, float32](ctx, namedMap,
filters.Equal(city, "Perth"), processors.Multiply("salary", 1.1, true)
for result := range ch {
if result.Err != nil {
log.Fatal(result.Err)
}
}

Note: there are many out of the box implementations of entry processors you can utilize. See the documentation for more information.

Aggregating Results

Aggregators can be used to perform operations against a subset of entries to obtain a single result. Entry aggregation occurs in parallel across the grid to provide map-reduce support when working with large amounts of data.

In the following example we are retrieving the distinct cities from all people in the NamedMap.

namedMap, err := coherence.GetNamedMap[int, Person](session, "people")
if err != nil {
log.Fatal(err)
}

// Retrieve the distinct cities from all people
citiesValues, err := coherence.Aggregate(ctx, namedMap, extractors.Extract[string]("city"))
if err != nil {
log.Fatal(err)
}
fmt.Println(*citiesValues)
// output: [Perth, Melbourne, Brisbane]

The next example returns the minimum age for specific keys.

age := extractors.Extract[int]("age")

// minimum age across keys 3 and 4
ageResult, err = coherence.AggregateKeys(ctx, namedMap, []int{3, 4}, aggregators.Min(age))

The final example is a bit more complex, but will return the top 2 people by salary.

// top 2 people by salary using filter
var salaryResult *[]Person
salaryResult, err = coherence.AggregateFilter[int, Person, []Person](ctx, namedMap,
filters.Greater(age, 40),
aggregators.TopN[float32, Person](extractors.Extract[float32]("salary"), false, 2))

Events

You can add event listeners for the following:

  • Caches — listen to inserts, update and delete
  • Cache lifecycle — truncated, released and destroyed
  • Session lifecycle events such -connected, disconnected, reconnected and closed

In the example below we create a MapEventListener to listen on all events for and then add to the NamedMap.

listener := coherence.NewMapListener[int, Person]().OnAny(func(e coherence.MapEvent[int, Person]) {
var (
newValue *Person
oldValue *Person
)
key, err := e.Key()
if err != nil {
panic("unable to deserialize key")
}

if e.Type() == coherence.EntryInserted || e.Type() == coherence.EntryUpdated {
newValue, err = e.NewValue()
if err != nil {
panic("unable to deserialize new value")
}
}

if e.Type() == coherence.EntryDeleted || e.Type() == coherence.EntryUpdated {
oldValue, err = e.OldValue()
if err != nil {
panic("unable to deserialize old value")
}
}

fmt.Printf("**EVENT=%v: key=%v, oldValue=%v, newValue=%v\n", e.Type(), *key, oldValue, newValue)
})

if err = namedMap.AddListener(ctx, listener); err != nil {
panic(err)
}

defer namedMap.RemoveListener(ctx, listener)

When we insert, update and delete entries we see the following:

**EVENT=insert: key=1, oldValue=<nil>, newValue=Person{id=1, name=Tim, age=21}
**EVENT=update: key=1, oldValue=Person{id=1, name=Tim, age=21}, newValue=Person{id=1, name=Tim, age=22}
**EVENT=delete: key=1, oldValue=Person{id=1, name=Tim, age=22}, newValue=<nil>

We could also add a MapLifecycleListener to a NamedMap to get notified when it is truncated.

listener := coherence.NewMapLifecycleListener[int, Person]().
OnTruncated(func(e coherence.MapLifecycleEvent[int, Person]) {
fmt.Printf("**EVENT=%s: source=%v\n", e.Type(), e.Source())
})

namedMap.AddLifecycleListener(listener)
defer namedMap.RemoveLifecycleListener(listener)

For a full set of events examples see the Coherence Go client examples.

Conclusion

The Coherence Go client brings the performance, scalability, reliability and availability of the Coherence Data Grid to Go applications allowing you microservices or Go based applications to utilize Coherence.

Visit Coherence Go Client on GitHub for more information.

--

--