Deep Dive: Aggregators

Gunnar Hillert
Oracle Coherence
Published in
8 min readFeb 7, 2023
2022 Mauna Loa eruption on the Big Island of Hawai‘i

In this blog post we will take a deep dive into Coherence Aggregators. We will describe how aggregators work, go over the built-in aggregators and provide a sample where we create a custom aggregator.

Concepts

In the previous article Deep Dive: Queries we took a closer look on how to query for data in a Coherence cluster, including the use of Extractors and Filters. In many use-cases you may also need to process the queried data and aggregate the results.

A naïve approach would be to perform the aggregation on the client-side, which can result in moving a lot of data across the wire for ultimately just one value. Another issue is that using this approach you will perform the aggregation single-threaded, potentially leading to performance issues as well.

A better solution is to use Aggregators.

Coherence supports entry aggregators that perform operations against all, or a subset of entries to obtain a single result. This aggregation is carried out in parallel across the cluster and is a map-reduce type of operation which can be performed efficiently across large amounts of data.

Built-in aggregators

Oracle Coherence provides many built-in aggregators which allow you to process data stored in Coherence in parallel:

  • Count
  • DistinctValues
  • Average
  • Min
  • Max
  • Sum
  • TopN
  • GroupBy

Many of the aggregators are available for various numeric data types, such as BigDecimal, Double and Long. Please see the API documentation for a complete list of available aggregators.

Using Aggregators

Using aggregators is straightforward. The relevant interface is com.tangosol.util.InvocableMap which both NamedMap and NamedCache extend.

It has three aggregate methods:

public default <R> R aggregate(EntryAggregator<? super K, ? super V, R> aggregator)
public <R> R aggregate(Collection<? extends K> collKeys, EntryAggregator<? super K, ? super V, R> aggregator);
public <R> R aggregate(Filter filter, EntryAggregator<? super K, ? super V, R> aggregator);

These methods allow you to either plug in one of the built-in aggregators or to provide a custom aggregator.

Let’s put aggregators to use. We will re-use the domain model from the previous article Deep Dive: Queries where we have a Customer object with an Address:

public class Customer {
private int id;
private String customerName;
private Address officeAddress;
private Address homeAddress;
private double outstandingBalance;
...
}
public class Address {
private String addressLine1;
private String addressLine2;
private String suburb;
private String city;
private String state;
private int postCode;
...
}

First, we will use a built-in aggregator to retrieve the Customer with the highest outstanding balance. As this property is of type double, we will use the DoubleMax aggregator. In order to do so, we need access to our Coherence NamedMap that contains all the customers:

Session session = Session.create();
NamedMap<Integer, Customer> customers = session.getMap("customers");

As we are evaluating all customer entries in the map, we don’t need to apply any filtering, yet. Thus, we call the customers’ aggregate() method that takes an EntryAggregator only as a parameter, and as mentioned previously, pass in the DoubleMax aggregator.

double maxOutstandingBalance =
customers.aggregate(Aggregators.doubleMax(Customer::getOutstandingBalance));

The DoubleMax aggregator in turn lets us specify the property we are interested in for aggregation purposes. We can either pass in a reference to a ValueExtractor or a String that specifies the property we want to extract.

Since the ValueExtractor interface is a functional interface, we can directly use the method reference Customer::getOutstandingBalance. But keep in mind, other ValueExtractor implementations do exist. For instance, in situations where the value of your map is a primitive value and not a complex object with properties, you may use the IdentityExtractor, which does nothing but return the value itself.

Anyway, in the example above we used a method reference to specify the outstandingBalance property. You could also use a simple property name to specify the property you would like to extract:

double maxOutstandingBalance =
customers.aggregate(Aggregators.doubleMax("outstandingBalance"));

Tip: Method references are the recommended way to use ValueExtractors due to the compile time type validation.

But what can we do if we are only interested in customers from Hawai‘i? In that case we will take advantage of the previously discussed queries and combine them with aggregators:

double maxOutstandingBalance = customers.aggregate(
Filters.equal(ValueExtractor.of(Customer::getHomeAddress)
.andThen(Address::getState), "HI"),
Aggregators.doubleMax(Customer::getOutstandingBalance));

First we pass in a Filter which uses a ValueExtractor that gets the customer’s home address first and then extracts the state. The filter only matches when the extracted state matches “HI”.

Java Streams and the StreamingAggregator interface

Aggregation can also be performed using the Java Streams API which when combined with Java’s lambda expressions, can provide a simpler programming model.

OptionalDouble maxOutstandingBalance = customers.stream()
.mapToDouble(entry -> entry.extract(Customer::getOutstandingBalance))
.max();

The above retrieves the highest outstanding value of a customer. If we want to aggregate the outstanding balance for Hawai‘i’s customers only (with the same result as above but using the Java streams API), we would just pass in the filter as an argument to .stream():

OptionalDouble maxOutstandingBalance = customers
.stream(Filters.equal(ValueExtractor.of(Customer::getHomeAddress)
.andThen(Address::getState), "HI"))
.mapToDouble(entry -> entry.extract(Customer::getOutstandingBalance))
.max();

IMPORTANT: If possible, please provide filters as an argument to the stream() method instead of making a call to .filter() afterwards. By providing the filter early, Coherence can take advantage of indices as well as minimize any deserialization.

As you can perform map or cache operations using the Java Streams API, Coherence provides an extension to the Java Streams API with the StreamingAggregator interface. This allows you to aggregate results across the entire Coherence cluster in parallel. All built-in aggregators implement the StreamingAggregator interface.

Therefore, if you have custom aggregation requirements, you would also implement that interface.

Creating Custom Aggregators

In order to create a custom aggregator you would need to implement the following methods:

  • supply() — Creates an instance we can accumulate into in parallel
  • accumulate() — Adds single entry to partial result when executing on storage members
  • getPartialResult() — Returns the partial result
  • combine() — Merges a single partial result into the final result
  • finalizeResult() — Applies finishing transformation to the final result and returns it

Aggregation in Coherence is executed using a map-reduce pattern across all cluster members storing the data. Thus, aggregation (accumulation) will occur on each storage member in parallel, and we will then get the partial results and combine them before returning the finalized result back to the user.

Let’s take a look at an example where we have a NamedMap documents populated with numerous Document objects and we need an aggregator that will count the numbers of occurrences for a provided collection of words across all documents.

public class Document implements Serializable {
private String id;
private String contents;
...

The custom Aggregator named WordCount needs to accept a set of words (String) as input parameter. As a result, the Aggregator will return a Map where the key of each map entry is the word we were looking for and its value the number of occurrences. Ultimately, we want to invoke the Aggregator on the documents NamedMap like this:

Map<String, Integer> results = documents.aggregate(new WordCount<>(setWords));

The WordCount aggregator implements the aforementioned StreamingAggregator interface and its 5 methods:

supply()

@Override
public InvocableMap.StreamingAggregator<K, V, Map<String, Integer>, Map<String, Integer>> supply() {
return new WordCount<>(this.setWords);
}

accumulate()

This method will count how many times the words exist in the the provided Document and add the results to the mapResults property.

@Override
public boolean accumulate(InvocableMap.Entry<? extends K, ? extends V> entry) {
Document document = entry.getValue();

for (String word : setWords) {
int count = document.getContents().split("\\b" + word + "\\b", -1).length - 1;
this.mapResults.compute(word, (k, v) -> v == null ? count : v + count);
}

return true;
}

getPartialResult()

In our simple use-case we just return mapResults containing the partial results.

@Override
public Map<String, Integer> getPartialResult() {
return mapResults;
}

combine()

Combines the results passed in with the current set of results.

@Override
public boolean combine(Map<String, Integer> mapPartialResult) {
if (!mapPartialResult.isEmpty()) {
mapPartialResult.forEach((k, v) -> mapResults.compute(k, (key, value) -> value == null ? v : value + v));
}
return true;
}

finalizeResult()

For our use-case, this method simply returns the final mapResults. This method is identical to getPartialResult(), however, in this instance of WordCount the mapResults property represents the already combined result.

@Override
public Map<String, Integer> finalizeResult() {
return mapResults;
}

We cover this example in much more detail as a dedicated guide in our collection of examples: Custom Aggregators.

Aggregators with the Repository API

As of Coherence 21.06, we provide the Coherence Repository API that provides a higher-level data access layer to interact with Coherence caches. Part of this API is also its first-class data aggregation support by providing built-in support for average, groupBy, max, min, sum and top.

This Repository API is also the basis to provide Coherence support for the Data API provided by Micronaut and Spring. This, of course, includes aggregation support. For more information see the respective reference documentation:

The Coherence Repository API, however, can also used independently and may provide a nicer interaction model for your application use-cases.

We already have the domain model in place for our Customer. In order to create a repository we need to create a class that extends AbstractRepository, e.g:

public class CustomerRepository extends AbstractRepository<Integer, Customer> {

private NamedMap<Integer, Customer> customers;

public CustomerRepository(NamedMap<Integer, Customer> customers) {
this.customers = customers;
}

@Override
protected Integer getId(Customer customer) {
return customer.getId();
}

@Override
protected Class<? extends Customer> getEntityType() {
return Customer.class;
}

@Override
protected NamedMap<Integer, Customer> getMap() {
return this.customers;
}
}

Of course, you are encouraged to add your own custom repository methods depending on your specific business needs. For example, if you want to have a repository method that finds customers by state, you may add the following:

protected static Filter<Customer> filterByState(String state) {
return Filters.like(
ValueExtractor.of(Customer::getHomeAddress).andThen(Address::getState), state);
}

public Collection<Customer> findByState(String state)
return getAll(filterByState(state));
}

Since we used a like Filter, we can then search for customers by State using wildcards as well. The following example will search for Customers that are in states that start with the letter H, which in this case returns the customers from Hawai‘i (HI)

Collection<Customer> customersByState = customerRepository.findByState("H%");

As mentioned above, we can also use the Coherence Repository API for aggregation queries. As an example, we will rewrite the query that returns the maximum outstanding balance across all customers that are located in Hawai‘i:

double maxOutstandingBalance = customerRepository.max(
filterByState("HI"),
Customer::getOutstandingBalance);

Important: The Coherence Repository API also provides simplified support for indices using the @Indexed annotation. Indices allow you to avoid deserializing entities stored across the cluster. See chapter Using Declarative Acceleration and Index Creation in the Coherence reference guide for more details.

Conclusion and further resources

In this article we covered how you can use Oracle Coherence for performing aggregated queries and the different ways of executing them. Also check out the aggregator-specific hands-on guides that we provide:

Furthermore, check out the relevant reference documentation:

If you would like more information on Coherence in general please see the following:

As always, if you have questions, please join our Slack channel or ask questions on Stack Overflow. And lastly, if you see missing features or if you have any other suggestions for improvement please contact us, e.g. via Twitter @OracleCoherence or feel free to file a GitHub issue.

--

--

Gunnar Hillert
Oracle Coherence

Consulting Member of Technical Staff at Oracle for the Coherence team. Java Champion, former Spring team member, OSS committer, DevNexus co-founder.