Deep Dive: Queries

Learn how to retrieve data stored in Coherence Clusters in the most optimal way

Tim Middleton
Oracle Coherence
11 min readDec 12, 2022

--

Introduction

This article explains in depth how distributed queries work within a Coherence cluster. We will cover the following: querying by keys, using extractors, streams, applying filters, retrieving data in batches, adding indexes, data affinity and query re-execution.

Note: When we refer to maps in the examples below, we are assuming Coherence distributed or partitioned maps including Federated maps, are being used where data is partitioned across all storage-members for maximum reliability and scalability.

Querying Overview

No matter which method you use to query data, Coherence runs these queries in parallel across all storage-enabled members for distributed or partitioned maps. This parallel execution is automatic and takes advantage of the multiple members across machines in your cluster to make the querying as efficient as possible. Whether you have 10 or 500 storage nodes, Coherence will automatically manage the running of queries in parallel for you.

Map entries can of course be returned for specific keys in the same way as you do for a Map using NamedMap.get(K Key). This is sufficient for basic applications but often we need to retrieve or query entries via multiple keys, or by selecting a subset of entries based upon query criteria using keys or values.

Domain Model

In the examples we use throughout this article, we are going to use the following Java classes and attributes, to demonstrate querying. We will assume that they are complete classes with getters, setters, constructors, equals and hashCode.

Customer
private int id;
private String customerName;
private Address officeAddress;
private Address homeAddress;
private double outstandingBalance;

Address
private String addressLine1;
private String addressLine2;
private String suburb;
private String city;
private String state;
private int postCode;

We also have the following NamedMap populated with data:

Session session = Session.create();
NamedMap<Integer, Customer> customers = session.getMap("customers”);

Querying By Keys

The simplest way to query data in Coherence is using the key to the entry.

Customer customer = customers.get(1);

You can also return entries based upon a Set of keys. In the example below we return the customers with keys 1, 2, and 3:

Map<Integer, Customer> mapCustomers = customers.getAll(Set.of(1, 2, 3)); 

You can also return a Set of all keys or a filtered Set of keys. As is the contract for Map.keySet(), if you remove an entry from the retrieved key Set, it will be removed from the underlying Map, or NamedMap in our case.

Set<Integer> setKeys = customers.keySet();
System.out.println("Keys=" + setKeys.size()); // Keys=10
setKeys.remove(1);
System.out.println("Map Size=" + customers.size()); // Map Size=9

You can also return a key set based upon a Filter. For example, to return the keys for all customers with a balance greater than 10,000 use the following:

Set<Integer> setKeys = customers.keySet(Filters.greater(Customer::getOutstandingBalance, 10_000d));

Note: We will explain filters in more detail below.

Using Extractors and Filters

Often you want to be able to query map entries via the attributes of the stored values, say the balance or any other attribute of the Customer object. To be able to do this we need to introduce ValueExtractors and Filters.

The ValueExtractor is used to extract an attribute value from a given object for querying and for indexing, which we explain later in this article.

The Filter allows us, as the name suggests, to filter entries based upon some criteria. As the filters and extractors execute on each of the storage nodes where the data resides, this is carried out in parallel and require the Java classes being queried to be in the server’s class path.

Another implementation is the UniversalExtractor which can be used several ways as part of a filter.

The following example shows how to retrieve all customers with a balance greater than $5,000 by specifying an attribute name. The appropriate getter for the attribute will be used:

Collection<Customer> colCustomers = customers.values(
Filters.greater(new UniversalExtractor<>("outstandingBalance"), 5_000d));

The above way of specifying attributes can be used but is prone to runtime errors if the name of the attribute is miss-spelled. Since the ValueExtractor interface is a functional interface, we can use method references, which are validated at compile time. So, we can rewrite the above as:

Collection<Customer> colCustomers = customers.values(
Filters.greater(Customer::getOutstandingBalance, 5_000d));

Tip: Method references are the recommended way to use ValueExtractors due to the compile time type validation.

As a further example, if we wanted to return all customers whose home address city is “Perth”, this would require the following, which extracts the home address and then the city:

Collection<Customer> colCustomers = customers.values(
Filters.equal(ValueExtractor.of(Customer::getHomeAddress)
.andThen(Address::getCity), "Perth"));

You will notice that we are using Filters.equal factory method to create an EqualsFilter. There are many other static methods on the Filters helper class that provide a Filter DSL to simplify the creation of filters. You can also do a static import of these methods which helps the readability. In the following example we using the .stream() method of NamedMap to stream all customers who have a balance greater than $10,000 and GOLD status.

customers.stream(greater(Customer::getOutstandingBalance, 10_000d)
.and(equal(Customer::getCustomerType, Customer.GOLD)))
.forEach(System.out::println);

There are many types of filters available in Coherence covering the most common criteria required by applications. These are available on the com.tangosol.util.Filters helper class.

  • Comparison — equal, less, lessEqual, greater, greaterEqual, between, like, never, isTrue, isFalse, isNull
  • Set/Array based — in, contains, arrayContainsAny, arrayContainsAll
  • Combining — and (all), or (any), not
  • Entry based — present
  • Others — predicate, regex

For more information see the Filters class in our Coherence JavaDoc Reference.

Applying Filters Early

When using streaming, it is more efficient to apply a filter within the .stream(Filter …) method rather than using the no-args stream() and applying a filter afterwards. The reason for this is that the former allows us to apply indexes during evaluation to avoid deserialization and limit the set of entries to process, while the latter processes all entries, most likely deserializing them in the process.

Recommended:

customers.stream(greater(Customer::getOutstandingBalance, 10_000d))
.map(Map.Entry::getValue)
.forEach(System.out::println);

The following is not recommended, as indexes are not leveraged:

customers.stream()
.map(Map.Entry::getValue)
.filter(c -> c.getOutstandingBalance() > 10_000d)
.forEach(System.out::println);

Retrieving Data in Batches

Sometimes you wish to receive data in pages rather than all the data in one request, which can potentially cause memory issues for large data sets. The LimitFilter can be useful in this scenario, but there are several assumptions that are made for this to work correctly:

  1. There are no concurrent modifications to the data set in question.
  2. Data is evenly distributed across all storage nodes of the cluster, such that each node has a fair sample of the entire set of data.

The following code shows how we could retrieve pages of data for all customers who have an outstanding balance greater than $10,000.

int pageSize = 25;

LimitFilter<Customer> limitFilter = new LimitFilter<>(greater(Customer::getOutstandingBalance, 10_000d), pageSize);

// retrieve entries 1-25
Set<Map.Entry<Integer, Customer>> setCustomers = customers.entrySet(limitFilter);

// retrieve entries 26-50
limitFilter.nextPage();
setCustomers = customers.entrySet(limitFilter)

Using Indexes

When Coherence stores objects on storage-enabled servers, the keys and values are serialized using the configured serializer. This means when we use filters to query attributes of objects stored in the map, or use any other operation such as entry processors or aggregations that require filters, every entry needs to be deserialized and checked to see if it satisfies the filter. This can slow down queries and entry processors considerably and result in unnecessary GC activity.

Indexes allow us to store a deserialized version of the attributes which can be used to satisfy filters for queries, entry processors and aggregators in a more efficient manner. Indexes are stored in data structures on the storage nodes that own the data and consist of a deserialized value and the binary key that this value maps to.

To add an index, you use the following method on the NamedMap:

addIndex(ValueExtractor extractor, boolean fOrdered, Comparator comparator)

For example, to add an index on OutstandingBalance attribute, you could issue the following:

customers.addIndex(Customer::getOutstandingBalance, true, null);

In this case we are creating an ordered index, which would allow us to efficiently process queries that require range searches such as outstanding balance is between 20,000 and 30,000. The third argument is the Comparator, which in this case is null, meaning that the natural ordering of the OutstandingBalance attributes will be used.

We will discuss how you can determine if indexes are being used in your queries in the next section.

Note: If the index to be added already exists, then this is a no-op, meaning it is safe to add indexes repeatedly if convenient.

Creating Custom Indexes

Custom indexes are not normally required, but can be useful in some edge cases to help reduce the index memory footprint. I’ve created an example below in which we have the following setup:

  • Person object with address attribute — 1,000,000 entries
  • Only 1% of people have addresses or 10,000 out of 1,000,000
  • We want to find the count of all people where address != null
  • A normal index would take up a lot of space as all entries, even the ones with null addresses, are indexed
  • We will create custom index that will exclude entries addresses with null values and reduce the index foot-print and the time.

There are two classes required: the actual index class which extends MapIndex and a value extractor to add the index which implements IndexAwareExtractor.

ExcludeNullsMapIndex class

The following index class extends SimpleMapIndex and overrides insert, update and delete methods to only update the index when the extracted value is not null.

We must also ensure the isPartial method returns true, as the index only contains a partial set of values in the NamedMap.

public class ExcludeNullsMapIndex
extends SimpleMapIndex {

public ExcludeNullsMapIndex(ValueExtractor extractor, boolean fOrdered, Comparator comparator, BackingMapContext ctx) {
super(extractor, fOrdered, comparator, ctx);
}

@Override
public void insert(Map.Entry entry) {
if (extractNewValue(entry) != null) {
super.insert(entry);
}
}

@Override
public void update(Map.Entry entry) {
if (extractNewValue(entry) != null) {
super.update(entry);
}
}

@Override
public void delete(Map.Entry entry) {
if (extractOldValue((MapTrigger.Entry) entry) != null) {
super.delete(entry);
}
}

@Override
public boolean isPartial() {
return true; // must return true as not all entries are indexed
}
}

CustomIndexAwareExtractor class

The following implementation allows you to add the index.

public class ExcludeNulls
implements IndexAwareExtractor, Serializable {

public ExcludeNulls(ValueExtractor extractor) {
m_extractor = extractor;
}

@Override
public MapIndex destroyIndex(Map mapIndex) {
return (MapIndex) mapIndex.remove(m_extractor);
}

@Override
public Object extract(Object o) {
return m_extractor.extract(o);
}

@Override
public MapIndex createIndex(boolean fOrdered, Comparator comparator, Map mapIndex, BackingMapContext bmc) {
ValueExtractor extractor = m_extractor;
MapIndex index = (MapIndex) mapIndex.get(extractor);

if (index != null) {
return index;
}

index = new ExcludeNullsMapIndex(extractor, fOrdered, comparator, bmc);
mapIndex.put(extractor, index);
return index;
}

private ValueExtractor m_extractor;
}

Adding the Index

To add the index, you would then carry out the following against your cache:

   map.addIndex(new ExcludeNulls(Person::getAddress));

Comparing the Index against a standard extractor

To compare the index footprint and performance, we tested running the following aggregation (on a 2-node cluster on a MacBook laptop) to return the count of people with addresses != null, with the above custom index and a standard index shown below:

map.addIndex(Person::getAddress);
int notNullCount = map.aggregate(isNotNull(Person::getAddress), Aggregators.count());

Index Sizes and Results

Standard Index

map.addIndex(Person::getAddress);
Total Indexing Bytes: 80,872,296 (78,976 KB)
Not null count = 10000, time = 616ms

Custom Index

map.addIndex(new ExcludeNulls(Person::getAddress));
Total Indexing Bytes: 2,640,184 (2,578 KB)
Not null count = 10000, time = 9ms

By using a custom index the index size was reduced by almost 99% from ~77MB to ~2.5MB and the aggregation time was reduced by almost 98% from 616ms down to 9ms.

The reduction in both the index size and the aggregation time is because a (significantly) smaller number of index entries need to be traversed.

Evaluating Query Cost and Effectiveness

Since we have been talking about indexes, we can create query explain plan records to view the estimated and actual cost and efficiency of your queries. Let’s use the example from earlier, where we queried all customers who have a balance greater than $10,000 and are GOLD customers. The filter for this would look like:

Filter<Customer> filter = greater(Customer::getOutstandingBalance, 10_000d)
.and(equal(Customer::getCustomerType, Customer.GOLD));

If we wanted to understand the potential impact of this, we can do the following:

QueryRecorder agent  = new QueryRecorder(QueryRecorder.RecordType.EXPLAIN);
Object resultsExplain = customers.aggregate(filter, agent);
System.out.println("\n" + resultsExplain + "\n");

When we execute the above, we will see something like the following:

Explain Plan
Filter Name Index Cost
======================================================================================
AndFilter | ---- | 0
GreaterFilter(.getOutstandingBalance(), 10000.0) | 0 | 10000000
EqualsFilter(.getCustomerType(), GOLD) | 1 | 10000000


Index Lookups
Index Description Extractor Ordered
======================================================================================
0 | No index found | .getOutstandingBalance() | false
1 | No index found | .getCustomerType() | false

Complete filter and index descriptions
N Full Name
======================================================================================

For full information on how to interpret the above see the Coherence Documentation, but effectively it is telling us that there are no indexes for any of the attributes, and it would be a high cost for this query as it needs to deserialize every entry to check against the customer type and balance.

If we added indexes on the outstandingBalance and customerType, then re-run the explain plan, you can see the indexes are now used.

customers.addIndex(Customer::getOutstandingBalance, true, null);
customers.addIndex(Customer::getCustomerType);
Explain Plan
Filter Name Index Cost
======================================================================================
AndFilter | ---- | 0
EqualsFilter(.getCustomerType(), GOLD) | 0 | 1
GreaterFilter(.getOutstandingBalance(), 10000.0) | 1 | 10


Index Lookups
Index Description Extractor Ordered
======================================================================================
0 | Simple: Footprint=781KB, Size=3 | .getCustomerType() | false
1 | Simple: Footprint=782KB, Size=10 | .getOutstandingBalance() | false


Complete filter and index descriptions
N Full Name
======================================================================================

Note: You can see in the above output that both indexes are being used. You can change the RecordType.EXPLAIN to RecordType.QUERY to get the actual cost of the query when it is run.

Data Affinity and KeyAssociatedFilter

Data affinity is the concept of ensuring that a group of related cache entries is contained within a single cache partition. This ensures that all relevant data is managed on a single primary cache node.

Affinity may span multiple caches (if they are managed by the same cache service, which generally is the case). For example, in a master-detail pattern such as an OrderLines, the Order object may be co-located with the entire collection of OrderLines objects that are associated with it.

There are two benefits for using data affinity. First, only a single cache node is required to manage queries and transactions against a set of related items. Second, all concurrency operations are managed locally, avoiding the need for clustered synchronization.

For this example, we have the following classes:

Order
private int orderId;
private long orderData;
...

OrderLine
private int orderId
private int lineId
...
OrderLine.Key {
private int orderId;
private int lineId
}

The above Order and OrderLine classes both contain orderId. OrderLine also contains lineId and its key is a Key class comprised of orderId and lineId. We can define OrderLine.Key as implementing KeyAssocation on orderId. This would then mean that when the OrderLine objects are inserted into the order-lines map, rather than using the key of the OrderLines object, it would use the value returned by the getAssociatedKey to determine the partition in which to place the entry. Since this is the same key as for the Order, then they we be placed in the same partition.

public class OrderLine implements Serializable {
private int orderId;
private int lineId

public static class Key
implements Serializable, KeyAssociation<Integer> {

private final int orderId;
private final int lineId;

@Override
public Integer getAssociatedKey() {
return orderId;
}

}
}

NamedMap<Integer, Order> orders = session.getMap("orders");
NamedMap<OrderLine.Key, OrderLine> orderLines = session.getMap("orderLines");

Once we have KeyAssociation, we can add data and use Filter.associatedWith method to create a KeyAssociatedFilter , which will allow us to query all the order lines associated with order X. This filter will specifically target the partition in which the order X and its lines reside.

Filter<OrderLine> filter = Filters.equal(OrderLine::getOrderId, 1)
.associatedWith(1);

orderLines.values(filter).forEach(System.out::println);

Note: A detailed example of Key Association is available on our Coherence Examples page on Coherence Community.

Query Re-execution under failure

While running queries in a cluster, if there is any repartitioning during the query execution, due either to new storage-members being added or members leaving, then the query will be re-executed automatically when the partitions are stable. For example, the following is the type of message you may see in these types of cases:

2022-11-18 08:31:44.143/5.536 Oracle Coherence CE 22.09 <Info> (thread=main, member=14): 
Repeating QueryRequest due to the re-distribution of PartitionSet{34}

The above message is an information message only. The query will be automatically retried without the need for any intervention or requirement to catch exceptions.

Conclusion

Hopefully this article has given you good insight into the power of Coherence querying. If you would like more information on Coherence please see the following.

--

--