Reactive programming with Java 8 and simple-react : stream creation

John McClean
3 min readMar 25, 2015

--

There are a number of ways to create a simple-react stream

FutureStream static factory methods

of

Create a Sequential FutureStream from an array of raw objects (not Futures)

LazyFutureStream.of(1,2,3,4,5);EagerFutureStream.of(“hello”,”world”);SimpleReactStream.of(new Car(),new House());of

parallelOf

Create a Parallel FutureStream from an array of raw objects (not Futures)

LazyFutureStream.parallelOf(1,2,3,4,5);EagerFutureStream.parallelOf(“hello”,”world”);SimpleReactStream.parallelOf(new Car(),new House());

sequentialCommonBuilder

This instantiates a Reactor for this Stream type using a shared sequential ExecutorService (i.e. single threaded, non-work steeling ExecutorService).

LazyReact lr = LazyFutureStream.sequentialCommonBuilder();EagerReact er = EagerFutureStream.sequentialCommonBuilder();SimpleReact sr = SimpleReactStream.sequentialCommonBuilder();

sequentialBuilder

This instantiates a Reactor for this Stream type using a new sequential ExecutorService (i.e. single threaded, non-work steeling ExecutorService).

LazyReact lr = LazyFutureStream.sequentialBuilder();EagerReact er = EagerFutureStream.sequentialBuilder();SimpleReact sr = SimpleReactStream.sequentialBuilder();

parallelCommonBuilder

This instantiates a Reactor for this Stream type using a shared parallel ExecutorService ( the common work-steeling ForkJoinPool is used).

LazyReact lr = LazyFutureStream.parallelCommonBuilder();EagerReact er = EagerFutureStream.parallelCommonBuilder();SimpleReact sr = SimpleReactStream.parallelCommonBuilder();

parallelBuilder

This instantiates a Reactor for this Stream type using a new parallel ExecutorService ( the common work-steeling ForkJoinPool is used — this will have parallelism set to the number of available processors). arallelism level can be optionally supplied (number of threads for the pool).

LazyReact lr = LazyFutureStream.parallelBuilder();EagerReact er = EagerFutureStream.parallelBuilder();SimpleReact sr = SimpleReactStream.parallelBuilder();

Or to create a Reactor with a set number of threads

LazyReact lr = LazyFutureStream.parallelBuilder(5);EagerReact er = EagerFutureStream.parallelBuilder(5);SimpleReact sr = SimpleReactStream.parallelBuilder(5);

Instantiating Reactors

Reactors can be instantiated via

  1. Convenience methods on FutureStreams (described above)
  2. ReactPools (see : — https://medium.com/@johnmcclean/reactive-programming-with-java-8-and-simple-react-pooling-reactors-bf6ae2c0a23b )

Direct

SimpleReact simpleReact = new SimpleReact();
EagerReact eagerReact = new EagerReact();
LazyReact lazyReact = new LazyReact();

This will instantiate a Reactor using the the common ForkJoinPool ( shared ForkJoinPool with parallelism equal to the number of processors on the host machine). We can also specify an ExecutorService to use -

SimpleReact simpleReact = new SimpleReact(new ForkJoinPool(4));
EagerReact eagerReact = new EagerReact(new ForkJoinPool(4));
LazyReact lazyReact = new LazyReact(new ForkJoinPool(4));

Once we have a Reactor instance we can use it to create Streams

Instantiating Streams

Reactors (SimpleReact, EagerReact, LazyReact) provide the following Stream creation methods -

of

Create a FutureStream from an array of raw objects (not Futures)

lazyReact.of(1,2,3,4,5);eagerReact.of(“hello”,”world”);simpleReact.of(new Car(),new House());

react

Create a FutureStream from an array of Suppliers. The Stream will be asynchronously populated

lazyReact.react(()->1,()->2,()->3,()->4,()->5);eagerReact.react(()->“hello”,()->”world”);simpleReact.react(()->new Car(),()->new House());

reactToCollection

Create a FutureStream from an existing collection

Collection<Integer> numbers = Arrays.asList(1,2,3,4,5,6);lazyReact.reactToCollection(numbers);eagerReact.reactToCollection(numbers);simpleReact.reactToCollection(numbers);

fromStream

Create a FutureStream from an existing Stream (or Seq)

Collection<Integer> numbers = Arrays.asList(1,2,3,4,5,6);lazyReact.fromStream(numbers.stream());eagerReact.fromStream(numbers.stream());simpleReact.fromStream(numbers.stream());

fromPrimitiveStream

Create a FutureStream from an existing Primitive Stream

IntStream numbers = IntStream.range(0, 1000000));lazyReact.fromPrimitiveStream(numbers);eagerReact.fromPrimitiveStream(numbers);simpleReact.fromPrimitiveStream(numbers);

fromStreamCompletableFutures

Create a FutureStream from an existing jdk Stream containing CompletableFutures

Stream<CompletableFuture> stream = futures.stream();lazyReact.fromStreamCompletableFutures(stream);eagerReact.fromStreamCompletableFutures(stream);simpleReact.fromStreamCompletableFutures(stream);

reactInfinitely

Create an infinite FutureStream from a synchronously executed supplier. (The supplier is executed synchronously to seed the next chain of asynchronous CompletableFutures).

lazyReact.reactInfintely(()->loadNewData());eagerReact.reactInfintely(()->loadNewData());simpleReact.reactInfintely(()->loadNewData());

reactInfinitelyAsync

Create an infinite FutureStream from an asynchronously executed supplier

lazyReact.reactInfintely(()->loadNewData());eagerReact.reactInfintely(()->loadNewData());simpleReact.reactInfintely(()->loadNewData());

iterateInfinitely

Create an infinite FutureStream from a synchronously executed iteration function. (The function is executed synchronously to seed the next chain of asynchronous CompletableFutures).

lazyReact.iterateInfintely(0, next -> next+1);eagerReact.iterateInfintely(0, next -> next+1);simpleReact.iterateInfintely(0, next -> next+1);

The Tutorial : Reactive programming with Java 8

--

--

John McClean

Architecture @ Verizon Media. Maintainer of Cyclops. Twitter @johnmcclean_ie