Reactive programming with Java 8 and simple-react : stream creation
There are a number of ways to create a simple-react stream
FutureStream static factory methods
of
Create a Sequential FutureStream from an array of raw objects (not Futures)
LazyFutureStream.of(1,2,3,4,5);EagerFutureStream.of(“hello”,”world”);SimpleReactStream.of(new Car(),new House());of
parallelOf
Create a Parallel FutureStream from an array of raw objects (not Futures)
LazyFutureStream.parallelOf(1,2,3,4,5);EagerFutureStream.parallelOf(“hello”,”world”);SimpleReactStream.parallelOf(new Car(),new House());
sequentialCommonBuilder
This instantiates a Reactor for this Stream type using a shared sequential ExecutorService (i.e. single threaded, non-work steeling ExecutorService).
LazyReact lr = LazyFutureStream.sequentialCommonBuilder();EagerReact er = EagerFutureStream.sequentialCommonBuilder();SimpleReact sr = SimpleReactStream.sequentialCommonBuilder();
sequentialBuilder
This instantiates a Reactor for this Stream type using a new sequential ExecutorService (i.e. single threaded, non-work steeling ExecutorService).
LazyReact lr = LazyFutureStream.sequentialBuilder();EagerReact er = EagerFutureStream.sequentialBuilder();SimpleReact sr = SimpleReactStream.sequentialBuilder();
parallelCommonBuilder
This instantiates a Reactor for this Stream type using a shared parallel ExecutorService ( the common work-steeling ForkJoinPool is used).
LazyReact lr = LazyFutureStream.parallelCommonBuilder();EagerReact er = EagerFutureStream.parallelCommonBuilder();SimpleReact sr = SimpleReactStream.parallelCommonBuilder();
parallelBuilder
This instantiates a Reactor for this Stream type using a new parallel ExecutorService ( the common work-steeling ForkJoinPool is used — this will have parallelism set to the number of available processors). arallelism level can be optionally supplied (number of threads for the pool).
LazyReact lr = LazyFutureStream.parallelBuilder();EagerReact er = EagerFutureStream.parallelBuilder();SimpleReact sr = SimpleReactStream.parallelBuilder();
Or to create a Reactor with a set number of threads
LazyReact lr = LazyFutureStream.parallelBuilder(5);EagerReact er = EagerFutureStream.parallelBuilder(5);SimpleReact sr = SimpleReactStream.parallelBuilder(5);
Instantiating Reactors
Reactors can be instantiated via
- Convenience methods on FutureStreams (described above)
- ReactPools (see : — https://medium.com/@johnmcclean/reactive-programming-with-java-8-and-simple-react-pooling-reactors-bf6ae2c0a23b )
Direct
SimpleReact simpleReact = new SimpleReact();
EagerReact eagerReact = new EagerReact();
LazyReact lazyReact = new LazyReact();
This will instantiate a Reactor using the the common ForkJoinPool ( shared ForkJoinPool with parallelism equal to the number of processors on the host machine). We can also specify an ExecutorService to use -
SimpleReact simpleReact = new SimpleReact(new ForkJoinPool(4));
EagerReact eagerReact = new EagerReact(new ForkJoinPool(4));
LazyReact lazyReact = new LazyReact(new ForkJoinPool(4));
Once we have a Reactor instance we can use it to create Streams
Instantiating Streams
Reactors (SimpleReact, EagerReact, LazyReact) provide the following Stream creation methods -
of
Create a FutureStream from an array of raw objects (not Futures)
lazyReact.of(1,2,3,4,5);eagerReact.of(“hello”,”world”);simpleReact.of(new Car(),new House());
react
Create a FutureStream from an array of Suppliers. The Stream will be asynchronously populated
lazyReact.react(()->1,()->2,()->3,()->4,()->5);eagerReact.react(()->“hello”,()->”world”);simpleReact.react(()->new Car(),()->new House());
reactToCollection
Create a FutureStream from an existing collection
Collection<Integer> numbers = Arrays.asList(1,2,3,4,5,6);lazyReact.reactToCollection(numbers);eagerReact.reactToCollection(numbers);simpleReact.reactToCollection(numbers);
fromStream
Create a FutureStream from an existing Stream (or Seq)
Collection<Integer> numbers = Arrays.asList(1,2,3,4,5,6);lazyReact.fromStream(numbers.stream());eagerReact.fromStream(numbers.stream());simpleReact.fromStream(numbers.stream());
fromPrimitiveStream
Create a FutureStream from an existing Primitive Stream
IntStream numbers = IntStream.range(0, 1000000));lazyReact.fromPrimitiveStream(numbers);eagerReact.fromPrimitiveStream(numbers);simpleReact.fromPrimitiveStream(numbers);
fromStreamCompletableFutures
Create a FutureStream from an existing jdk Stream containing CompletableFutures
Stream<CompletableFuture> stream = futures.stream();lazyReact.fromStreamCompletableFutures(stream);eagerReact.fromStreamCompletableFutures(stream);simpleReact.fromStreamCompletableFutures(stream);
reactInfinitely
Create an infinite FutureStream from a synchronously executed supplier. (The supplier is executed synchronously to seed the next chain of asynchronous CompletableFutures).
lazyReact.reactInfintely(()->loadNewData());eagerReact.reactInfintely(()->loadNewData());simpleReact.reactInfintely(()->loadNewData());
reactInfinitelyAsync
Create an infinite FutureStream from an asynchronously executed supplier
lazyReact.reactInfintely(()->loadNewData());eagerReact.reactInfintely(()->loadNewData());simpleReact.reactInfintely(()->loadNewData());
iterateInfinitely
Create an infinite FutureStream from a synchronously executed iteration function. (The function is executed synchronously to seed the next chain of asynchronous CompletableFutures).
lazyReact.iterateInfintely(0, next -> next+1);eagerReact.iterateInfintely(0, next -> next+1);simpleReact.iterateInfintely(0, next -> next+1);