Introducing the Coherence Python client

Dhiru Pandey
Oracle Coherence
Published in
8 min readJun 20, 2023

--

We are excited to announce that a Release Candidate of the new Coherence Python client is now publicly available, bringing the performance, scalability, reliability and availability of the Coherence Data Grid to Python applications!

Overview

The Coherence Python client allows Python applications to act as cache clients to an Oracle Coherence cluster using the Google gRPC framework as the network transport.

The Coherence Python client supports all of the familiar Coherence “map-like” operations which include:

  • APIs like - put, put_if_absent, put_all, get, get_all, remove, clear, get_or_default, replace, replace_mapping, size, is_empty, contains_key, contains_value
  • Server-side Processing — Cluster-side manipulation of map entries using EntryProcessors
  • Querying / Aggregation — Cluster-side querying, aggregation and filtering of map entries
  • Events — Registration of listeners to be notified of mutations such as insert, update and delete on Maps; map lifecycle events such as truncated, released or destroyed; and session lifecycle events such as connected, disconnected, reconnected and closed
  • Data Serialization — Support for storing Python objects as JSON as well as the ability to serialize to Java objects on the server for access from other Coherence language API’s

Requirements

  • Coherence CE 22.06.4+ and Coherence 14.1.1.2206.4+ Commercial edition with a configured gRPC Proxy.
  • Python 3.11

Documentation and Examples

If you would like to jump directly to the documentation or the very comprehensive examples see the following, otherwise read-on to learn how to get started.

Getting Started

To install the Coherence Python Client one would need to do a pip install of the coherence package as shown below:

python3 -m pip install coherence-client

Start a Coherence gRPC enabled proxy server. You can use our pre-built Docker image which has everything to get you started.

docker run -d -p 1408:1408 ghcr.io/oracle/coherence-ce:22.06.4

Basic CRUD Example

Below is an example of a sample Python program that carries out the following:

  • Connects to Coherence gRPC proxy on port 1408 using plain text
  • Creates a new NamedMap with key / value types of int and string
    Note: A NamedMap is equivalent to a cache that doesn’t expire. You can use a NamedCache if you want expiry.
  • Uses put, get and remove to manipulate NamedMap entries.
import asyncio

from coherence import NamedMap, Session


async def do_run() -> None:
"""
Demonstrates basic CRUD operations against a NamedMap using
`int` keys and `str` values.

:return: None
"""
session: Session = await Session.create()
try:
namedMap: NamedMap[int, str] = await session.get_map("my-map")

print("Put key 1; value one")
await namedMap.put(1, "one")

print("Value for key 1 is :", await namedMap.get(1))

print("NamedMap size is :", await namedMap.size())

print("Updating value of key 1 to ONE from ", await namedMap.put(1, "ONE"))

print("Value for key 1 is :", await namedMap.get(1))

print("Removing key 1, current value :", await namedMap.remove(1))

print("NamedMap size is :", await namedMap.size())
finally:
await session.close()


asyncio.run(do_run())

Use standard Python objects for Keys or Values

Here we use standard Python objects from class AccountKey and Account as key and value respectively to store in a NamedMap

import asyncio
from dataclasses import dataclass

from coherence import NamedMap, Processors, Session


@dataclass
class AccountKey:
accountId: int
accountType: str


@dataclass
class Account:
accountId: int
accountType: str
name: str
balance: float


async def do_run() -> None:
"""
Demonstrates basic CRUD operations against a NamedMap using
`AccountKey` keys with `Account` values.

:return: None
"""
session: Session = await Session.create()
try:
namedMap: NamedMap[AccountKey, Account] = await session.get_map("accounts")

await namedMap.clear()

new_account_key: AccountKey = AccountKey(100, "savings")
new_account: Account = Account(new_account_key.accountId, new_account_key.accountType, "John Doe", 100000.00)

print(f"Add new account {new_account} with key {new_account_key}")
await namedMap.put(new_account_key, new_account)

print("NamedMap size is :", await namedMap.size())

print("Account from get() :", await namedMap.get(new_account_key))

print("Update account balance using processor ...")
await namedMap.invoke(new_account_key, Processors.update("balance", new_account.balance + 1000))

print("Updated account is :", await namedMap.get(new_account_key))

await namedMap.remove(new_account_key)

print("NamedMap size is :", await namedMap.size())
finally:
await session.close()


asyncio.run(do_run())

By default, data is stored as JSON on the Coherence cluster, but as mentioned before, you can configure the cluster to store as actual Java objects. See the documentation for more information.

Querying Cache Data

Coherence allows you to query on any data attribute present in your data model, and even allows you to define your own query filter if one of the built-in ones doesn’t fit the bill.

See the utility class Filters for the filters supported out-of-the-box by this client.

The following example demonstrates various querying operations against a NamedMap:

import asyncio
from dataclasses import dataclass
from typing import List

from coherence import Filters, NamedMap, Session
from coherence.filter import Filter


@dataclass
class Hobbit:
"""
A simple class representing a Hobbit.
"""

id: int
name: str
age: int
home: str


async def do_run() -> None:
"""
Demonstrates various Filter operations against a NamedMap.

:return: None
"""
session: Session = await Session.create()
try:
homes: List[str] = ["Hobbiton", "Buckland", "Frogmorton", "Stock"]
namedMap: NamedMap[int, Hobbit] = await session.get_map("hobbits")

await namedMap.clear()

num_hobbits: int = 20
print("Adding", num_hobbits, "random Hobbits ...")
for i in range(num_hobbits):
await namedMap.put(i, Hobbit(i, "Hobbit-" + str(i), 15 + i, homes[i % 4]))

print("NamedMap size is :", await namedMap.size())

print("Retrieve the Hobbits between the ages of 17 and 21 ...")
async for entry in namedMap.entries(Filters.between("age", 17, 21)):
print("Key :", entry.key, ", Value :", entry.value)

print("Retrieve the Hobbits between the ages of 17 and 30 and live in Hobbiton ...")
query_filter: Filter = Filters.between("age", 17, 30).And(Filters.equals("home", "Hobbiton"))
async for entry in namedMap.entries(query_filter):
print("Key :", entry.key, ", Value :", entry.value)

print("Retrieve the Hobbits between the ages of 17 and 25 who live in Hobbiton or Frogmorton")
query_filter = Filters.between("age", 17, 25).And(Filters.is_in("home", {"Hobbiton", "Frogmorton"}))
async for entry in namedMap.entries(query_filter):
print("Key :", entry.key, ", Value :", entry.value)

finally:
await session.close()

asyncio.run(do_run())

Aggregating Results

Sometimes you don’t need the actual data objects that are stored within the data grid, but a derived, calculated result based on them. This is where Coherence aggregation features come in handy.

Aggregations can be executed against the whole data set, or they can be limited to a subset of the data using a query or a key set.

See the utility class Aggregators for the aggregators supported out-of-the-box by this client.

The following example demonstrates various aggregation operations against a NamedMap:

import asyncio
from dataclasses import dataclass
from decimal import Decimal
from typing import List

from coherence import Aggregators, Filters, NamedMap, Session


@dataclass
class Hobbit:
"""
A simple class representing a Hobbit.
"""

id: int
name: str
age: int
hobbies: str


async def do_run() -> None:
"""
Demonstrates various Aggregator operations against a NamedMap.

:return: None
"""
person_data = {
1: Hobbit(1, "Bilbo", 111, "Burglaring"),
2: Hobbit(2, "Frodo", 50, "Bearing"),
3: Hobbit(3, "Sam", 38, "Side Kick"),
4: Hobbit(3, "Meriadoc", 36, "Side Kick"),
5: Hobbit(3, "Peregrin", 28, "Side Kick"),
}

session: Session = await Session.create()
try:
namedMap: NamedMap[int, Hobbit] = await session.get_map("aggregation-test")

await namedMap.clear()

await namedMap.put_all(person_data)

distinct_hobbies: List[str] = await namedMap.aggregate(Aggregators.distinct("hobbies"))
print("Distinct hobbies :", distinct_hobbies)

count: int = await namedMap.aggregate(Aggregators.count())
print("Number of Hobbits :", count)

over_forty: int = await namedMap.aggregate(Aggregators.count(), filter=Filters.greater("age", 40))
print("Number of Hobbits older than 40 :", over_forty)

avg_under_forty: Decimal = await namedMap.aggregate(Aggregators.average("age"), filter=Filters.less("age", 40))
print("Average age of Hobbits under 40 :", int(avg_under_forty))

print("The oldest Hobbit for each hobby ...")
results: dict[str, int] = await namedMap.aggregate(Aggregators.group_by("hobbies", Aggregators.max("age")))
for hobby, age in results.items():
print("Hobby: ", hobby, "Max age: ", age)
finally:
await session.close()


asyncio.run(do_run())

In-Place Processing using Entry Processors

Entry processors are a powerful feature of Coherence where instead of retrieving data from the cluster, processing that data and then returning it to the cluster, you send the processing to the cluster and it is carried out in-place where the data resides.

This is extremely efficient as it incorporates a “lock-free” processing model meaning you have an automatic implicit lock on the keys/ values when your entry processors are run to ensure data integrity/ consistency. It also scales well when you have multiple entries to update, and reduces the amount of traffic required for updates dramatically.

See the utility class Processors for the entry processors supported out-of-the-box by this client.

The following example demonstrates various entry processing operations against a NamedMap:

import asyncio
from dataclasses import dataclass
from typing import List

from coherence import NamedMap, Processors, Session


@dataclass
class Hobbit:
"""
A simple class representing a Hobbit.
"""

id: int
name: str
age: int


async def do_run() -> None:
"""
Demonstrates various EntryProcessor operations against a NamedMap.

:return: None
"""
session: Session = await Session.create()
try:
namedMap: NamedMap[int, Hobbit] = await session.get_map("hobbits")

await namedMap.clear()

hobbit: Hobbit = Hobbit(1, "Bilbo", 111)
print("Add new hobbit :", hobbit)
await namedMap.put(hobbit.id, hobbit)

print("NamedMap size is :", await namedMap.size())

print("Hobbit from get() :", await namedMap.get(hobbit.id))

print("Update Hobbit using processor ...")
await namedMap.invoke(hobbit.id, Processors.update("age", 112))

print("Updated Hobbit is :", await namedMap.get(hobbit.id))

hobbit2: Hobbit = Hobbit(2, "Frodo", 50)

print("Add new hobbit :", hobbit2)
await namedMap.put(hobbit2.id, hobbit2)

print("NamedMap size is :", await namedMap.size())

print("Sending all Hobbits ten years into the future!")
keys: List[int] = []
async for entry in namedMap.invoke_all(Processors.increment("age", 10)):
keys.append(entry.key)
print("Updated age of Hobbit with id ", entry.key, "to", entry.value)

print("Displaying all updated Hobbits ...")
async for result in namedMap.get_all(set(keys)):
print(result.value)

await namedMap.remove(hobbit.id)
await namedMap.remove(hobbit2.id)
finally:
await session.close()


asyncio.run(do_run())

Events

Coherence provides the ability to subscribe to notifications pertaining to a particular map/cache.

One can register for notifications using event listeners for the following events:

  • Caches — listen to inserts, update and delete events
  • Cache lifecycle — truncated, released and destroyed events
  • Session lifecycle events such -connected, disconnected, reconnected and closed events

In addition to listening for specific events, it is possible to listen to events for changes made to a specific key, or when using a Filter it’s possible to limit the events raised to be for a subset of the map entries.

The following example demonstrates using lifecycle and map events:

import asyncio

from coherence import Filters, NamedMap, Session
from coherence.event import MapLifecycleEvent, MapListener
from coherence.filter import MapEventFilter


async def do_run() -> None:
"""
Demonstrates listeners for entry events and cache lifecycle.

:return: None
"""
session: Session = await Session.create()
try:
namedMap: NamedMap[int, str] = await session.get_map("listeners-map")
await namedMap.put(1, "1")

print("NamedMap lifecycle events")

namedMap.on(MapLifecycleEvent.RELEASED, lambda x: print("RELEASED", x))
namedMap.on(MapLifecycleEvent.TRUNCATED, lambda x: print("TRUNCATE", x))
namedMap.on(MapLifecycleEvent.DESTROYED, lambda x: print("DESTROYED", x))

print("Truncating the NamedMap; this should generate an event ...")
await namedMap.truncate()
await asyncio.sleep(1)

print("Releasing the NamedMap; this should generate an event ...")
namedMap.release()
await asyncio.sleep(1)

print("Destroying the NamedMap; this should generate an event ...")
await namedMap.destroy()
await asyncio.sleep(1)

print("\n\nNamedMap entry events")

namedMap = await session.get_map("listeners-map")

listener1: MapListener[int, str] = MapListener()
listener1.on_any(lambda e: print(e))

print("Added listener for all events")
print("Events will be generated when an entry is inserted, updated, and removed")
await namedMap.add_map_listener(listener1)

await namedMap.put(1, "1")
await namedMap.put(1, "2")
await namedMap.remove(1)
await asyncio.sleep(1)

await namedMap.remove_map_listener(listener1)

print("\nAdded listener for all entries, but only when they are inserted")
filter = Filters.event(Filters.always(), MapEventFilter.INSERTED)
await namedMap.add_map_listener(listener1, filter)

await namedMap.put(1, "1")
await namedMap.put(1, "2")
await namedMap.remove(1)
await asyncio.sleep(1)

await namedMap.remove_map_listener(listener1, filter)

print("\nAdded listener for entries with a length larger than one, but only when they are updated or removed")
filter = Filters.event(Filters.greater("length()", 1), MapEventFilter.UPDATED | MapEventFilter.DELETED)
await namedMap.add_map_listener(listener1, filter)

for i in range(12):
await namedMap.put(i, str(i))
await namedMap.put(i, str(i + 1))
await namedMap.remove(i)

await asyncio.sleep(1)

await namedMap.remove_map_listener(listener1, filter)
await namedMap.clear()
finally:
await session.close()


asyncio.run(do_run())

Conclusion

The Coherence Python client brings the performance, scalability, reliability and availability of the Coherence Data Grid to Python applications.

Visit Coherence Python client on GitHub for more information.

--

--