Reactive programming with Java 8 and simple-react : stream creation

John McClean
Mar 25, 2015 · 3 min read

There are a number of ways to create a simple-react stream

FutureStream static factory methods

of

Create a Sequential FutureStream from an array of raw objects (not Futures)

LazyFutureStream.of(1,2,3,4,5);

parallelOf

Create a Parallel FutureStream from an array of raw objects (not Futures)

LazyFutureStream.parallelOf(1,2,3,4,5);

sequentialCommonBuilder

This instantiates a Reactor for this Stream type using a shared sequential ExecutorService (i.e. single threaded, non-work steeling ExecutorService).

LazyReact lr = LazyFutureStream.sequentialCommonBuilder();

sequentialBuilder

This instantiates a Reactor for this Stream type using a new sequential ExecutorService (i.e. single threaded, non-work steeling ExecutorService).

LazyReact lr = LazyFutureStream.sequentialBuilder();

parallelCommonBuilder

This instantiates a Reactor for this Stream type using a shared parallel ExecutorService ( the common work-steeling ForkJoinPool is used).

LazyReact lr = LazyFutureStream.parallelCommonBuilder();

parallelBuilder

This instantiates a Reactor for this Stream type using a new parallel ExecutorService ( the common work-steeling ForkJoinPool is used — this will have parallelism set to the number of available processors). arallelism level can be optionally supplied (number of threads for the pool).

LazyReact lr = LazyFutureStream.parallelBuilder();

Or to create a Reactor with a set number of threads

LazyReact lr = LazyFutureStream.parallelBuilder(5);

Instantiating Reactors

Reactors can be instantiated via

  1. Convenience methods on FutureStreams (described above)
  2. ReactPools (see : — https://medium.com/@johnmcclean/reactive-programming-with-java-8-and-simple-react-pooling-reactors-bf6ae2c0a23b )

Direct

SimpleReact simpleReact = new SimpleReact();
EagerReact eagerReact = new EagerReact();
LazyReact lazyReact = new LazyReact();

This will instantiate a Reactor using the the common ForkJoinPool ( shared ForkJoinPool with parallelism equal to the number of processors on the host machine). We can also specify an ExecutorService to use -

SimpleReact simpleReact = new SimpleReact(new ForkJoinPool(4));
EagerReact eagerReact = new EagerReact(new ForkJoinPool(4));
LazyReact lazyReact = new LazyReact(new ForkJoinPool(4));

Once we have a Reactor instance we can use it to create Streams

Instantiating Streams

Reactors (SimpleReact, EagerReact, LazyReact) provide the following Stream creation methods -

of

Create a FutureStream from an array of raw objects (not Futures)

lazyReact.of(1,2,3,4,5);

react

Create a FutureStream from an array of Suppliers. The Stream will be asynchronously populated

lazyReact.react(()->1,()->2,()->3,()->4,()->5);

reactToCollection

Create a FutureStream from an existing collection

Collection<Integer> numbers = Arrays.asList(1,2,3,4,5,6);

fromStream

Create a FutureStream from an existing Stream (or Seq)

Collection<Integer> numbers = Arrays.asList(1,2,3,4,5,6);

fromPrimitiveStream

Create a FutureStream from an existing Primitive Stream

IntStream numbers = IntStream.range(0, 1000000));

fromStreamCompletableFutures

Create a FutureStream from an existing jdk Stream containing CompletableFutures

Stream<CompletableFuture> stream = futures.stream();

reactInfinitely

Create an infinite FutureStream from a synchronously executed supplier. (The supplier is executed synchronously to seed the next chain of asynchronous CompletableFutures).

lazyReact.reactInfintely(()->loadNewData());

reactInfinitelyAsync

Create an infinite FutureStream from an asynchronously executed supplier

lazyReact.reactInfintely(()->loadNewData());

iterateInfinitely

Create an infinite FutureStream from a synchronously executed iteration function. (The function is executed synchronously to seed the next chain of asynchronous CompletableFutures).

lazyReact.iterateInfintely(0, next -> next+1);

The Tutorial : Reactive programming with Java 8

John McClean

Written by

Architecture @ Verizon Media. Maintainer of Cyclops. Twitter @johnmcclean_ie