Reactive programming with Java 8 and simple-react : batching and chunking

If we have a Stream of events, it may be more efficient to process them in batches.

LazyFutureStream.parallelCommonBuilder()
.reactToCollection(files)
.map(this::readFileToString)
.map(this::parseJson)
.batchBySize(10)
.forEach(System.out::println);

Then we can process our groups of 10 objects

LazyFutureStream.parallelCommonBuilder()
.iterateInfinitely(“”, last->nextFile())
.map(this::readFileToString)
.map(this::parseJson)
.batchBySize(10)
.onePer(1, TimeUnit.SECONDS)
.peek(batch -> System.out.println(“batched : “ +
batch))
.map(this::processOrders)
.flatMap(Collection::stream)
.peek(individual -> System.out.println(“Flattened
: “ + individual))
.forEach(this::save);

We can also batch by time

LazyFutureStream.parallelCommonBuilder()
.iterateInfinitely(“”, last->nextFile())
.map(this::readFileToString)
.map(this::parseJson)
.batchByTime(1, TimeUnit.SECONDS)
.peek(batch -> System.out.println(“batched : “ +
batch))
.map(this::processOrders)
.flatMap(Collection::stream)
.peek(individual -> System.out.println(“Flattened
: “ + individual))
.forEach(this::save);

Or just take what has completed since our last read

LazyFutureStream.parallelCommonBuilder()
.iterateInfinitely(“”, last->nextFile())
.map(this::readFileToString)
.map(this::parseJson)
.chunkSinceLastRead()
.peek(batch -> System.out.println(“batched : “ +
batch))
.map(this::processOrders)
.flatMap(Collection::stream)
.peek(individual -> System.out.println(“Flattened
: “ + individual))
.forEach(this::save);

We can also create our own batching logic

LazyFutureStream.parallelCommonBuilder()
.reactToCollection(files)
.map(this::readFileToString)
.map(this::parseJson)
.batch(customBatchingFunction)
.forEach(System.out::println);

Where customBatchingFunction is a function that takes a Supplier of individual results and converts it to a Supplier of results batched in a collection. e.g.

Function<Supplier<U>, Supplier<Collection<U>>> customFn;

The function to batch by time looks like this

Function<Supplier<U>, Supplier<Collection<U>>> fn = s -> {
return () -> {
SimpleTimer timer = new SimpleTimer();
List<U> list = new ArrayList<>();
try {
do {
list.add(s.get());
} while (timer.getElapsedNanoseconds()
<unit.toNanos(time));
} catch (ClosedQueueException e) {

throw new ClosedQueueException(list);
}
return list;
};
};

The Tutorial : Reactive programming with Java 8