Reactive programming with Java 8 and simple-react : flow control

John McClean
Mar 23, 2015 · 1 min read

Generate an event every second

LazyFutureStream.sequentialCommonBuilder()
.iterateInfinitely(0, it -> it+1)
.onePer(1, TimeUnit.SECONDS)

In the above Stream, one event will emerge per second — starting with 0,1,2,3,4,5,6,7,8,9,10 etc. We can print out the results using peek.

LazyFutureStream.sequentialCommonBuilder()
.iterateInfinitely(0, it -> it+1)
.onePer(1, TimeUnit.SECONDS)
.peek(System.out::println);

With a LazyFutureStream we should use run or block to start the Stream. block() will block the current thread and will aggregate all results into a collection (memory leak alert for an infinite Stream!).

Now we can do some work

LazyFutureStream.sequentialCommonBuilder()
.iterateInfinitely(0, it -> it + 1)
.onePer(1, TimeUnit.SECONDS)
.map(seconds -> readStatus())
.map(this::saveStatus)
.runOnCurrent();

But what if saveStatus is unreliable?

private String saveStatus(Status s){
if(count++%2==0)
throw new RuntimeException();

return “Status saved:”+count;
}

retry!

By swapping out a map command for retry, we can handle the unreliability of the saveStatus method.

onePer emits all values at a slower pace, what if we want to emit one value per second, but drop values from the stream?

debounce!

LazyFutureStream.sequentialCommonBuilder()
.iterateInfinitely(0, it -> it + 1)
.debounce(100, TimeUnit.MILLISECONDS)
.peek(System.out::println)
.runOnCurrent();

This will let one value through every 100 ms, but the iterator will continue generating values, even when they are excluded from the Stream.

jitter!

LazyFutureStream.sequentialCommonBuilder()
.fromPrimitiveStream(IntStream.range(0, 1000000))
.map(it -> it*100)
.jitter(1000000000l)
.peek(System.out::println)
.runOnCurrent();

jitter will apply a random ‘jitter’ between each event.

fixedDelay

fixed delay introduces a fixed time delay between elements passing through a stage.

LazyFutureStream.sequentialCommonBuilder()
.fromPrimitiveStream(IntStream.range(0, 1000000))
.fixedDelay(1l,TimeUnit.SECONDS)
.peek(System.out::println)
.runOnCurrent();

The Tutorial : Reactive programming with Java 8

John McClean

Written by

Architecture @ Verizon Media. Maintainer of Cyclops. Twitter @johnmcclean_ie