Introducing EagerFutureStream & LazyFutureStream

John McClean
4 min readFeb 24, 2015

--

SimpleReact v0.4 introduces two new JDK 8 Stream implementations, EagerFutureStream and LazyFutureStream. Both streams can operate sequentially or in parallel, and significantly extend Java 8 Stream capabilities in a number of ways :

  1. They can operate entirely asynchronously (sequentially or in parallel without blocking the current thread).
  2. They offer sophisticated retry capabilities, failure recovery and error capturing
  3. Concurrency is highly configurable (target task executors, control active concurrency levels)
  4. Ability to short circuit collection & asynchronous collection
  5. Streams can be replayed
  6. Native ability to join streams asynchronously (merge/zip/concat)
  7. They implement the jOOλ Seq interface

Asynchronous operation

3 example streams

Stream.of(1,2,3,4)
.map(it->it+1)
.forEach(System.out::println)

EagerFutureStream.of(1,2,3,4)
.map(it->it+1)
.peek(System.out::println)

LazyFutureStream.of(1,2,3,4)
.map(it->it+1)
.peek(System.out::println)
.run()

Each of the Streams above behaves differently, but each will output

2345

The first Stream which is a standard JDK 8 Stream, will synchronously apply the map function and output the result to the console, blocking the current thread.

The EagerFutureStream will asynchronously run on a separate thread immediately. Users can block the current thread using block() as a terminal operation, which collects results.

The LazyFutureStream will also asynchronously execute on a separate thread, but won’t start until run is called. Users can block the current thread using block() as a terminal operation, which collects results — or runOnCurrent which does not.

(youtube videos can start pretty blurry unfortunately — but should improve after playing for a few seconds).

Sophisticated error handling

An example EagerFutureStream :-

EagerFutureStream.of(data)
.retry(data -> confirmSaved(data))
.onFail(e-> writeToPending(e.getValue()))
.capture(e -> logFailure(e.getValue()) ;

In this case our stream will repeatedly try to save data, up to a configurable number of times, or length of time. If after retrying it has still failed it will attempt to write to a pending operations queue, to indicate the save failed and should be retried later. If that also fails, we log the failure to a failure error log for manual intervention and correction later.

Configurable Concurrency

Example streams

Stream.of(1,2,3,4)
.parallel()
.map(it->it+1)
.collect(Collector.toList())
LazyFutureStream.parallel(1,2,3,4)
.map(it->it+1)
.collect(Collector.toList())

Both the JDK Parallel Stream and the LazyFutureStream will attempt to execute in parallel on the common ForkJoinPool. But the ExecutorService is easily configurable for the LazyFutureStream.

LazyFutureStream.parallelBuilder(6)
.of(1,2,3,4)
.map(it->it+1)
.collect(Collector.toList())

In this case we have instructed the LazyFutureStream to use a new ForkJoinPool with parallelism of 6 (NB — not recommended to create a new ExecutorService for each stream).

We can also change the ExecutorService at each stage of the Stream

LazyFutureStream.parallelBuilder(10)
.react(()-> ioBoundOperation(),
LazyReact.times(10))
.withTaskExecutor(ForkJoinPool.common())
.map(data->cpuBoundOperation(data))
.collect(Collector.toList())

In this example, we are using an ExecutorService with 10 threads, to perform 10 parallel I/O operations, and reducing the number of threads to the number of Cores by using the common ForkJoinPool.

Ability to short circuit collection & asynchronous collection

The block method allows results to be collected (in a manner that blocks the current thread), but provides the ability to supply a predicate that can be used to short circuit result collection. E.g.

LazyFutureStream.parallel(1,2,3)
.map(it -> it * 100)
.consume(it -> sleep(it))
.block(status -> status.getAllCompleted() >1 && status.getElapsedMillis()>200);

This Stream will finish collecting results as soon as more than one result has been collected and more than 200 ms have elapsed.

Results can be collected asynchronously using allOf

EagerFutureStreams can be replayed

We can split an EagerFutureStream at any point, and play forward again from there. This means, unlike JDK 8 Stream implementations EagerFutureStream can be used to implement caching.

e.g.

EagerFutureStream<Integer> stream = EagerFutureStream.of(1,2,3,4)
.map(it->it*1000000);
List<String> toStrings = stream.map(it-> it+"!")
.collect(Collectors.toList());
List<Date> toDates = stream.map(Date::new)
.collect(Collectors.toList());

Native ability to join streams asynchronously (merge/ concat)

EagerFutureStream.sequentialBuilder() 
.react(()->loadData(),EagerReact.times(50))
.merge(EagerFutureStream.of(1,2,3,4,5,6,7))

In this example the merge command will merge the underlying Streams of CompletableFutures immediately, but the results will be populated asynchronously on a separate thread.

Implement the jOOλ Seq interface

The Seq interface defined by the excellent jOOλ library augments the Java 8 Streams api with a large number of additional methods such such as :

foldLeft / foldRight, scanLeft / scanRight, zip, concat, duplicate, cycle, limitUntil, skipUntil and much more.

jOOλ also adds useful support for Tuple datastructures.

See https://github.com/jOOQ/jOOL for more details.

The good news is both EagerFutureStream and LazyFutureStream implement this more powerful interface as well as the JDK 8 java.util.stream.Stream interface.

EagerFutureStream.sequentialBuilder() 
.react(()->loadData(),EagerReact.times(50))
.zip(EagerFutureStream.of(1,2,3,4,5,6,7,8,9,10))

--

--

John McClean

Architecture @ Verizon Media. Maintainer of Cyclops. Twitter @johnmcclean_ie