Reactive programming with Java 8 and simple-react : stream creation

There are a number of ways to create a simple-react stream

FutureStream static factory methods

of

Create a Sequential FutureStream from an array of raw objects (not Futures)

LazyFutureStream.of(1,2,3,4,5);
EagerFutureStream.of(“hello”,”world”);
SimpleReactStream.of(new Car(),new House());of

parallelOf

Create a Parallel FutureStream from an array of raw objects (not Futures)

LazyFutureStream.parallelOf(1,2,3,4,5);
EagerFutureStream.parallelOf(“hello”,”world”);
SimpleReactStream.parallelOf(new Car(),new House());

sequentialCommonBuilder

This instantiates a Reactor for this Stream type using a shared sequential ExecutorService (i.e. single threaded, non-work steeling ExecutorService).

LazyReact lr = LazyFutureStream.sequentialCommonBuilder();
EagerReact er = EagerFutureStream.sequentialCommonBuilder();
SimpleReact sr = SimpleReactStream.sequentialCommonBuilder();

sequentialBuilder

This instantiates a Reactor for this Stream type using a new sequential ExecutorService (i.e. single threaded, non-work steeling ExecutorService).

LazyReact lr = LazyFutureStream.sequentialBuilder();
EagerReact er = EagerFutureStream.sequentialBuilder();
SimpleReact sr = SimpleReactStream.sequentialBuilder();

parallelCommonBuilder

This instantiates a Reactor for this Stream type using a shared parallel ExecutorService ( the common work-steeling ForkJoinPool is used).

LazyReact lr = LazyFutureStream.parallelCommonBuilder();
EagerReact er = EagerFutureStream.parallelCommonBuilder();
SimpleReact sr = SimpleReactStream.parallelCommonBuilder();

parallelBuilder

This instantiates a Reactor for this Stream type using a new parallel ExecutorService ( the common work-steeling ForkJoinPool is used — this will have parallelism set to the number of available processors). arallelism level can be optionally supplied (number of threads for the pool).

LazyReact lr = LazyFutureStream.parallelBuilder();
EagerReact er = EagerFutureStream.parallelBuilder();
SimpleReact sr = SimpleReactStream.parallelBuilder();

Or to create a Reactor with a set number of threads

LazyReact lr = LazyFutureStream.parallelBuilder(5);
EagerReact er = EagerFutureStream.parallelBuilder(5);
SimpleReact sr = SimpleReactStream.parallelBuilder(5);

Instantiating Reactors

Reactors can be instantiated via

  1. Convenience methods on FutureStreams (described above)
  2. ReactPools (see : — https://medium.com/@johnmcclean/reactive-programming-with-java-8-and-simple-react-pooling-reactors-bf6ae2c0a23b )

Direct

SimpleReact simpleReact = new SimpleReact();
EagerReact eagerReact = new EagerReact();
LazyReact lazyReact = new LazyReact();

This will instantiate a Reactor using the the common ForkJoinPool ( shared ForkJoinPool with parallelism equal to the number of processors on the host machine). We can also specify an ExecutorService to use -

SimpleReact simpleReact = new SimpleReact(new ForkJoinPool(4));
EagerReact eagerReact = new EagerReact(new ForkJoinPool(4));
LazyReact lazyReact = new LazyReact(new ForkJoinPool(4));

Once we have a Reactor instance we can use it to create Streams

Instantiating Streams

Reactors (SimpleReact, EagerReact, LazyReact) provide the following Stream creation methods -

of

Create a FutureStream from an array of raw objects (not Futures)

lazyReact.of(1,2,3,4,5);
eagerReact.of(“hello”,”world”);
simpleReact.of(new Car(),new House());

react

Create a FutureStream from an array of Suppliers. The Stream will be asynchronously populated

lazyReact.react(()->1,()->2,()->3,()->4,()->5);
eagerReact.react(()->“hello”,()->”world”);
simpleReact.react(()->new Car(),()->new House());

reactToCollection

Create a FutureStream from an existing collection

Collection<Integer> numbers = Arrays.asList(1,2,3,4,5,6);
lazyReact.reactToCollection(numbers);
eagerReact.reactToCollection(numbers);
simpleReact.reactToCollection(numbers);

fromStream

Create a FutureStream from an existing Stream (or Seq)

Collection<Integer> numbers = Arrays.asList(1,2,3,4,5,6);
lazyReact.fromStream(numbers.stream());
eagerReact.fromStream(numbers.stream());
simpleReact.fromStream(numbers.stream());

fromPrimitiveStream

Create a FutureStream from an existing Primitive Stream

IntStream numbers = IntStream.range(0, 1000000));
lazyReact.fromPrimitiveStream(numbers);
eagerReact.fromPrimitiveStream(numbers);
simpleReact.fromPrimitiveStream(numbers);

fromStreamCompletableFutures

Create a FutureStream from an existing jdk Stream containing CompletableFutures

Stream<CompletableFuture> stream = futures.stream();
lazyReact.fromStreamCompletableFutures(stream);
eagerReact.fromStreamCompletableFutures(stream);
simpleReact.fromStreamCompletableFutures(stream);

reactInfinitely

Create an infinite FutureStream from a synchronously executed supplier. (The supplier is executed synchronously to seed the next chain of asynchronous CompletableFutures).

lazyReact.reactInfintely(()->loadNewData());
eagerReact.reactInfintely(()->loadNewData());
simpleReact.reactInfintely(()->loadNewData());

reactInfinitelyAsync

Create an infinite FutureStream from an asynchronously executed supplier

lazyReact.reactInfintely(()->loadNewData());
eagerReact.reactInfintely(()->loadNewData());
simpleReact.reactInfintely(()->loadNewData());

iterateInfinitely

Create an infinite FutureStream from a synchronously executed iteration function. (The function is executed synchronously to seed the next chain of asynchronous CompletableFutures).

lazyReact.iterateInfintely(0, next -> next+1);
eagerReact.iterateInfintely(0, next -> next+1);
simpleReact.iterateInfintely(0, next -> next+1);

The Tutorial : Reactive programming with Java 8

Show your support

Clapping shows how much you appreciated John McClean’s story.