Plumbing Java 8 Streams with Queues, Topics and Signals

John McClean
7 min readFeb 11, 2015

--

Joining two or more streams

Imagine a Java 8 application with multiple Streams. How do those Streams interact?

Dried up Stream By Mick Garratt : Creative Commons License

It’s pretty easy to create Streams in an isolated fashion. But not so straightforward to have data flow smoothly from one Stream to another. The typical Java 8 application with multiple Streams, will probably have a multiple disconnected Streams — and where Streams are indirectly connected it will invariably be by the creation of fully populated collections.

As if, in nature, a massive truck sucks up all the water in a Stream and moves it to a new location to act as a source there!

Disconnected Streams within an Application

Imagine again a Java 8 application. With two modules and related Streams. One module processes raw, streaming text input — from a file or from a remote server via a REST Client, and converts it into application specific Entity classes. Imagine that the incoming data is large. Large enough that materialising it all in memory at once may cause problems.

Elsewhere, in our application, we have a Stream that is dedicated to processing those application Entities (e.g. entities that are something like a PurchaseOrder or other kind of User request). This Stream applies some custom processing before passing the result to an external Message Queue system (or database).

How can we transfer data between these Streams? If we do that by aggregating data into large Collections, we may lose many of the inherit advantages of Stream based processing. Advantages such as small overhead and continuous processing / error handling / recovery.

Reusable Interconnected Streams

It would be nice if the Streams that generate the user Entity classes from text based sources, could reuse the Entity processing Stream. It would also be nice, if we could scale up our down the number of input handling and processing streams in accordance to the scaling needs of our application.

We need a Stream-friendly intermediary

All problems in computer science can be solved by another level of indirection

To join the Streams, and control how they are joined, we need to put a joining data structure in between them.

At first glance a Queue looks like it could be a good solution. Each of the input generating Streams could write to the Queue, and any Processing Streams could read from it.

Introducing SimpleReact Queue

SimpleReact is a new, very small, library from AOL, that makes working with Java 8 Streams a little easier. One of the data structures they provide is com.aol.simple.react.async.Queue (with special thanks to Gary Coady who suggested it!).

We can pass data to the Queue from the two input handling Streams and have the processing Stream recieve data from the Queue, e.g. :-

Joining two streams

Using Java 8 semantics, this might look something like this :-

(continued below..)

Queue queue = new Queue();//thread 1 : manage file stream
fileStream.map(next -> toEntity(next))
.peek(entity-> queue.add(entity));
//thread 2: manage rest stream
queue.fromStream(restStream);
//thread 3: manage processing stream
Stream processingStream = queue.stream()
.map(entity -> process(entity))
.forEach(processed -> save(processed));
Note that calling forEach will result in Thread 3 'infinitely' processing the Stream from the Queue.

Dealing with infinite Streams

The simplest way to deal with the infinite nature of the Stream from the Queue is to use it in such a manner that it is continuously processing incoming data (like in the example above). If, on the other hand, you would like to start and stop the queue, you have two options available.

Option 1: place a limit on the number of items to read.

e.g.

Stream processingStream = queue.stream()
.limit(10)
.map(entity -> process(entity))

Option 2: Close the queue

Alternatively, closing the Queue (via queue.close()) will cause the Stream to finish with a Queue.ClosedQueueException e.g.

Thread t = new Thread( ()-> {
Thread.sleep(100000);
queue.close();
}).start();
try{Stream processingStream = queue.stream()
.map(entity -> process(entity))
.forEach(processed -> save(processed));
}catch(Queue.ClosedQueueException e){}

Splitting two Streams

Splitting a Stream into two processing Streams

So, what if we want to go the other way, and split a single Stream into two processing streams. E.g. we may have a single Stream for handling input, but may want to distribute the processing work over multiple threads:-

Queue queue = new Queue();//thread 1: manage input
queue.fromStream(restStream);
//thread 2: manage processing stream A
Stream processingStreamA = queue.stream()
.map(entity -> process(entity))
.forEach(processed -> save(processed));
//thread 3: manage processing stream B
Stream processingStreamB = queue.stream()
.map(entity -> process(entity))
.forEach(processed -> save(processed));

Because processingStreamA and processingStreamB have the same functions we can refactor into a common method to build the processing Stream. E.g.

Stream processingStreamA = buildProcessingStream(queue.stream());private Stream buildProcessingStream(Stream input){
return input.map(entity -> process(entity))
.forEach(processed -> save(processed));
}

Handling back pressure

A Queue can be used to manage ‘back pressure’ between producers and consumers. If producing Streams generate more data than consumers can handle, ‘back pressure’ can slow down the producer so that data production doesn’t overwhelm the system.

In this case, a SimpleReact Queue backed by a bounded BlockingQueue, can apply back pressure by blocking the producing Stream when the Queue is full.

e.g.

Queue<Integer> q = new Queue<>(new LinkedBlockingQueue<>(2));new SimpleReact().react(() -> { q.offer(1); return found++;}, 
() -> { q.offer(1); return found++; },
() -> { q.offer(6); return found++; },
() -> { q.offer(5); return found++; });
sleep(10);
assertThat(found, is(2));

The SimpleReact Stream will be blocked until data is drained from the Queue by a subscribing Stream. E.g. By streaming 2 records out of the Queue we will create space for additional data

 q.stream().limit(2).collect(Collectors.toList());
sleep(2);
assertThat(found, is(4));

And the same rule holds for standard Java 8 Streams.

Thread 1: Queue<String> q = new Queue<>(new LinkedBlockingQueue<>(2));Thread 2: Stream.of(“1",”2",”3",”4").forEach(it -> {q.offer(it); found++;});Thread 1: assertThat(found, is(2));

Splitting Streams so they receive the same data

When Splitting a Stream with a queue, each subscribing Stream competes to read data from the queue. This means each piece of data added into the queue, is passed on as output to only one Stream.

If you would like subscribing Streams to recieve every piece of data added to a datastructure, you need to use a topic.

A Topic will guarantee every message is delivered to subscribing Streams. The use case for Topics is likely to be different than for Queues.

Queues can be used to spread load across processing threads, Topics can be used to provide the same data as input into Streams that can process that data concurrently in different ways.

For example recieving user data that should be processed differently for two different sub-systems would be a good candidate for making use of a topic.

Topic topic = new Topic();//thread 1: manage input
topic.fromStream(restStream);
//thread 2: manage processing stream A
Stream processingStreamA = topic.stream()
.map(entity -> processA(entity))
.forEach(processed -> saveA(processed));
//thread 3: manage processing stream B
Stream processingStreamB = topic.stream()
.map(entity -> processB(entity))
.forEach(processed -> saveB(processed));

As each Stream connected to a topic is guaranteed to recieve it’s messages, it’s a good idea to explicitly disconnect when you are done.

e.g.

topic.disconnect(processingStreamB);

Capturing the Stream of changes

What if, instead of Streaming every data point, we’d like to respond only to changes in some critical metric? We can use a Signal class, and provide it with Streaming input data. The discrete output Stream will only contain the changes.

Streaming changes in data

A Signal can be backed by either a Queue or a Topic (and the rules as to whether all messages will be seen by Processing Stream A or B will depend on which is used).

Signal signal = Signal.queueBackedSignal();//thread 1 : populate signal with health data signal.fromStream(Stream.generate(()->checkHealthStatus())
.peek(status -> sleep(500));
//thread 2 : read changes in health from Signal
signal.stream().forEach(health -> respond(health));

Putting it all together

With SimpleReact and Streams, you can plumb together as many different flows within your application as you need.

You can load balance processing across cores on multicore machines (with Queues), or concurrently execute unrelated Streams with Topics. You can keep your applications highly responsive to changes with Signal.

For heavy processing you can take full advantage of all your cores by using a SimpleReact flow as output (or input) to any Queue, Topic or Signal!

--

--

John McClean

Architecture @ Verizon Media. Maintainer of Cyclops. Twitter @johnmcclean_ie