Reactive programming with Java 8 and simple-react : flow control
Generate an event every second
LazyFutureStream.sequentialCommonBuilder()
.iterateInfinitely(0, it -> it+1)
.onePer(1, TimeUnit.SECONDS)
In the above Stream, one event will emerge per second — starting with 0,1,2,3,4,5,6,7,8,9,10 etc. We can print out the results using peek.
LazyFutureStream.sequentialCommonBuilder()
.iterateInfinitely(0, it -> it+1)
.onePer(1, TimeUnit.SECONDS)
.peek(System.out::println);
With a LazyFutureStream we should use run or block to start the Stream. block() will block the current thread and will aggregate all results into a collection (memory leak alert for an infinite Stream!).
Now we can do some work
LazyFutureStream.sequentialCommonBuilder()
.iterateInfinitely(0, it -> it + 1)
.onePer(1, TimeUnit.SECONDS)
.map(seconds -> readStatus())
.map(this::saveStatus)
.runOnCurrent();
But what if saveStatus is unreliable?
private String saveStatus(Status s){
if(count++%2==0)
throw new RuntimeException();
return “Status saved:”+count;
}
retry!
By swapping out a map command for retry, we can handle the unreliability of the saveStatus method.
onePer emits all values at a slower pace, what if we want to emit one value per second, but drop values from the stream?
debounce!
LazyFutureStream.sequentialCommonBuilder()
.iterateInfinitely(0, it -> it + 1)
.debounce(100, TimeUnit.MILLISECONDS)
.peek(System.out::println)
.runOnCurrent();
This will let one value through every 100 ms, but the iterator will continue generating values, even when they are excluded from the Stream.
jitter!
LazyFutureStream.sequentialCommonBuilder()
.fromPrimitiveStream(IntStream.range(0, 1000000))
.map(it -> it*100)
.jitter(1000000000l)
.peek(System.out::println)
.runOnCurrent();
jitter will apply a random ‘jitter’ between each event.
fixedDelay
fixed delay introduces a fixed time delay between elements passing through a stage.
LazyFutureStream.sequentialCommonBuilder()
.fromPrimitiveStream(IntStream.range(0, 1000000))
.fixedDelay(1l,TimeUnit.SECONDS)
.peek(System.out::println)
.runOnCurrent();