Reactive programming with Java 8 and simple-react : zipping streams

John McClean
1 min readMar 25, 2015

--

zipping allows two Streams to be merged element by element. simple-react supports a number of different ways to zip Streams

zip based on results

zip based on futures (with results populated asynchronously)

zip with index (on either results or futures)

combine latest (results)

with latest (results)

zip based on results

a = LazyFutureStream.parallelBuilder().react(()->slowest(),()->fast(),()->slow());
b = LazyFutureStream.sequentialBuilder().of(1,2,3,4,5,6);

zipping Streams a and b

a.zip(b);

Should result in

[(fastResult,1),(slowResult,2),(slowestResult,3)]

zip based on futures

zipping by future will result in a new Stream that is populated rapidly with placeholders for the actual Stream data. Ordering will be maintained from the previous phase of the Stream (rather than being determined by the completion of results).

a = EagerFutureStream.parallelBuilder().react(()->slowest(),()->fast(),()->slow());
b = EagerFutureStream.sequentialBuilder().of(1,2,3,4,5,6);

zipping Streams a and b

a.zipFutures(b);

Should result in

[(slowestResult,1),(fastResult,2),(slowResult,3)]

zip with index (on either results or futures)

LazyFutureStream.sequentialBuilder().react(()->slowest(),()->fast(),()->slow()).zipWithIndex()

Should result in

[(fastResult,0),(slowResult,1),(slowestResult,2)]

Likewise zipFuturesWithIndex

EagerFutureStream.parallelBuilder().react(()->slowest(),()->fast(),()->slow()).zipFuturesWithIndex()

Should result in

[(slowestResult,0),(fastResult,1),(slowResult,2)]
* order not guaranteed but pairings are

combine latest

a = LazyFutureStream.sequentialBuilder().react(()->slowest(),()->fast(),()->slow());
b = LazyFutureStream.sequentialBuilder().of(1,2,3,4,5,6);

zipping these streams with combineLatest should result in the most recent entry from each stream in the result set.

a.combineLatest(b).collect(Collectors.toList());;

Expected result

[(null,1),(null,2),(null,3),(null,4),(null,5),(null,6),(fastResult,6),(slowResult,6),(slowestResult,6)]

with latest

a = LazyFutureStream.sequentialBuilder().react(()->slowest(),()->fast(),()->slow());
b = LazyFutureStream.sequentialBuilder().of(1,2,3,4,5,6);

zipping these streams with withLatest should result in the most recent entry from each stream in the result set.

a.withLatest(b).collect(Collectors.toList());

Expected result

[(fastResult,6),(slowResult,6),(slowestResult,6)]

The Tutorial : Reactive programming with Java 8

--

--

John McClean

Architecture @ Verizon Media. Maintainer of Cyclops. Twitter @johnmcclean_ie