Reactive programming with Java 8 and simple-react : batching and chunking
1 min readMar 25, 2015
If we have a Stream of events, it may be more efficient to process them in batches.
LazyFutureStream.parallelCommonBuilder()
.reactToCollection(files)
.map(this::readFileToString)
.map(this::parseJson)
.batchBySize(10)
.forEach(System.out::println);
Then we can process our groups of 10 objects
LazyFutureStream.parallelCommonBuilder()
.iterateInfinitely(“”, last->nextFile())
.map(this::readFileToString)
.map(this::parseJson)
.batchBySize(10)
.onePer(1, TimeUnit.SECONDS)
.peek(batch -> System.out.println(“batched : “ +
batch))
.map(this::processOrders)
.flatMap(Collection::stream)
.peek(individual -> System.out.println(“Flattened
: “ + individual))
.forEach(this::save);
We can also batch by time
LazyFutureStream.parallelCommonBuilder()
.iterateInfinitely(“”, last->nextFile())
.map(this::readFileToString)
.map(this::parseJson)
.batchByTime(1, TimeUnit.SECONDS)
.peek(batch -> System.out.println(“batched : “ +
batch))
.map(this::processOrders)
.flatMap(Collection::stream)
.peek(individual -> System.out.println(“Flattened
: “ + individual))
.forEach(this::save);
Or just take what has completed since our last read
LazyFutureStream.parallelCommonBuilder()
.iterateInfinitely(“”, last->nextFile())
.map(this::readFileToString)
.map(this::parseJson)
.chunkSinceLastRead()
.peek(batch -> System.out.println(“batched : “ +
batch))
.map(this::processOrders)
.flatMap(Collection::stream)
.peek(individual -> System.out.println(“Flattened
: “ + individual))
.forEach(this::save);
We can also create our own batching logic
LazyFutureStream.parallelCommonBuilder()
.reactToCollection(files)
.map(this::readFileToString)
.map(this::parseJson)
.batch(customBatchingFunction)
.forEach(System.out::println);
Where customBatchingFunction is a function that takes a Supplier of individual results and converts it to a Supplier of results batched in a collection. e.g.
Function<Supplier<U>, Supplier<Collection<U>>> customFn;
The function to batch by time looks like this
Function<Supplier<U>, Supplier<Collection<U>>> fn = s -> {
return () -> {
SimpleTimer timer = new SimpleTimer();
List<U> list = new ArrayList<>();
try {
do {
list.add(s.get());
} while (timer.getElapsedNanoseconds()
<unit.toNanos(time));
} catch (ClosedQueueException e) {
throw new ClosedQueueException(list);
}
return list;
};
};