CodeX
Published in

CodeX

Java Stream Custom Collector

Without a doubt, the Stream was one of the major features added to Java 8. If you don't know what a stream is, a stream is a sequence of objects that support functional-style operations on elements, such as map-reduce transformations on collections with a pipeline approach. You can get much more information from Oracle documents.

In this article, we will focus on how to utilize the Collector interface and create our own custom implementation. It may look a little complex at the beginning but as soon as you get the idea you will find it very useful.

Requirements:

Knowledge of Java 8, Stream, and functional programming.

When to use it?

The best way to learn this topic is by example but first, let me share with you a couple of use cases, and then we can implement an example step by step.

Aggregation is one of the most used scenarios for custom collectors. Although there are predefined methods to do aggregation out of the box in the Stream API, It is very common that you may need to do some more complex aggregation using your own specific business need. The output of the aggregation can be an object or a list of objects that are grouped based on your requirements.

Transformation & Enrichment is another use case that you can consider to utilize the custom Collector. Of course, there are a lot of other tools in the collection framework that can help you with this. I think you will find the Collectors a strong solution for enrichment and transformation processes.

You may say that all of these can be done by SQL and it is true, but you will find yourself in situations that running complex and expensive aggregation and enrichment queries can easily be avoided by just using this approach.

Collector Interface

All of this is possible thanks to the Collector interface. This interface is provided by Java 8 as part of the new Stream API.

Interface Collector<T,A,R> 
  • T — the type of input elements in the stream for the reduction operation
  • A — the mutable accumulation type of the reduction operation (often hidden as an implementation detail)
  • R - the result type of the reduction operation

This interface has 4 main methods that work together to accumulate entries into a mutable result container. Their responsibilities are as follow:

  • supplier(): Creation of a new result container
  • accumulator(): Incorporating a new data element into a result container.
  • combiner(): Combining two result containers into one.
  • finisher(): Performing an optional final transform on the container

Custom Collector Example

Aggregation Example

Imagine you have a list of user transactions and you like to create an object that wraps the list and adds some additional information like the average amount, max amount, and the latest transaction date using stream and your own custom collector. (We have used @data and @ToString from Lombok library for simplification)

@Data
@ToString
public class Transaction {
private Long id;
private Long userId;
private BigDecimal amount = BigDecimal.ZERO;
private Date dateTime;
}

This is the aggregated object we want to get in the output:

@Data
@ToString
public class TransactionAggregation {
private BigDecimal average = BigDecimal.ZERO;
private BigDecimal total = BigDecimal.ZERO;
private BigDecimal max = BigDecimal.ZERO;
private Date lastTransactionDate;
private List<Transaction> transactionList = new ArrayList<>();
}

We can not start to create our own implementation of the Collector interface based on the previous knowledge we have shared.

public class TransactionAggregationCollector implements Collector<Transaction, TransactionAggregation, TransactionAggregation> {
// implementation can find
}

So as you can see we have Transaction class as T so our implementation will expect a stream of transaction objects. and for A and R we are using TransactionAggregation, which means our intermediate and result object will be using this type.

@Override
public Supplier<TransactionAggregation> supplier() {
return () -> new TransactionAggregation();
}

The first method we need to implement is the supplier method which will help to create a new container result. in our case, it is a new TransactionAggregation object.

Note: the () -> lambda expression is helping us to implement the Supplier Functional Interfaces which is the return type of supplier method. for more information take look at Lambda and Functional Interfaces Concepts.

@Override
public BiConsumer<TransactionAggregation, Transaction> accumulator() {
// ta is same TransactionAggregation object from supplier
// t is each transaction item from the stream
return (ta, t) -> {
// Add transaction to result transaction list
ta.getTransactionList().add(t);

// Get the Max
if( ta.getMax().compareTo(t.getAmount()) == -1 )
ta.setMax(t.getAmount());

// Find the latest Transaction Date
if( ta.getLastTransactionDate() == null ){
ta.setLastTransactionDate(t.getDateTime());
} else if( ta.getLastTransactionDate().compareTo(t.getDateTime()) < 0 ){
ta.setLastTransactionDate(t.getDateTime());
}

// Calculate Total amount
ta.setTotal( t.getAmount().add(ta.getTotal()) );
// Calculate Average
ta.setAverage(ta.getTotal()
.divide(new BigDecimal(ta.getTransactionList().size())));
};
}

The accumulator method helps us to do the aggregation. the implementation will run repeatedly as long as there are elements in the stream.

BiConsumer is a functional interface; it takes two arguments and returns nothing.

@Override
public BinaryOperator<TransactionAggregation> combiner() {
return (ta1, ta2) -> {

if( ta1.getMax().compareTo(ta2.getMax()) == -1 )
ta1.setMax(ta2.getMax());

ta1.setAverage( ta1.getAverage()
.add(ta2.getAverage())
.divide(new BigDecimal(2)) );

ta1.setTotal( ta1.getTotal().add(ta2.getTotal()) );

if( ta1.getLastTransactionDate().compareTo(ta2.getLastTransactionDate()) < 0 )
ta1.setLastTransactionDate(ts2.getLastTransactionDate());

return ta1;
};
}

If you are familiar with the Stream API you most probably heard or used the parallelStream(). It helps you to process the stream in parallel threads. but in the end, it needs to combine the results of each thread. The combiner() method will be used to combine the two results from two threads into one object. Therefore, as you can see the implementation actually contains the logic for combining to TransactionAggregation object.

BinaryOperator represents an operation upon two operands of the same type, producing a result of the same type as the operands. This is a specialization of BiFunction for the case where the operands and the result are all of the same type.

@Override
public Function<TransactionAggregation, TransactionAggregation> finisher() {
return (transactionAggregation) ->{
return transactionAggregation;
};
}

The finisher()is the method that returns the final result and it will pass at the end of the stream pipeline to the caller.

Function represents a function that accepts one argument and produces a result.

@Override
public Set<Characteristics> characteristics() {
return Collections.emptySet();
}

Using this method you can provide characteristics of the collector. In our case we are returning an empty set.

Characteristics indicating properties of a Collector, which can be used to optimize reduction implementations.

CONCURRENT Indicates that this collector is concurrent, meaning that the result container can support the accumulator function being called concurrently with the same result container from multiple threads.

IDENTITY_FINISH Indicates that the finisher function is the identity function and can be elided.

UNORDERED Indicates that the collection operation does not commit to preserving the encounter order of input elements.

TransactionAggregation transactionAggregation = transactionList.stream().collect(new TransactionAggregationCollector());

As you can see it is really simple to use collector you just need to create a stream from the transaction list and call the collect method with the custom collector object as input.

Conclusion

As you can see the whole process is pretty straightforward. As you start using you will notice how useful this option can be in different use cases.

You can find the sample source code in Github.

--

--

--

Everything connected with Tech & Code. Follow to join our 900K+ monthly readers

Recommended from Medium

Static Code Analysis with SonarQube

How to extract online data using Python

Assignment Week 10

Integration of ERP in Magento Platform

ERP Integration in Magento Platform

Paper Review:“System R: Relational Approach to Database Management”

Best Programming Languages You Can Learn as a Java Developer

Time and Materials or Fixed Cost: Why T&M is Best

Move the most recent commit(s) to a new branch with Git

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Farzin Pashaee

Farzin Pashaee

Software Engineer at Maybank, AI and ML enthusiastic

More from Medium

JAVA 8 vs JAVA

Java, Inheritance and Polymorphism

Java Stream operations