Java 8 Streams : 10 missing features

Java 8 Streams were exceptionally well designed. The API itself offers a concise (a polite way of saying frustratingly limited) feature set. Perhaps this is because the team behind it were focused on adding data parallelism, which arguably limited their options in terms of features for sequential Streaming.

However it turns out that, that conciseness is an advantage not a weakness. By creating a simple API that does one thing really well (that is represent a totally lazy, potentially infinite Stream of data) they created something that was also infinitely extensible by the community.

In this article I hope to show you 10 important features that weren’t included within the Stream API, but also 10 extension points that offer you those features. If you are frustrated with the verbosity of the core API in solving a particular problem, never fear — there is very likely an extended API which will make the solution much simpler, and more elegant (we’ll use examples from cyclops-react Streaming APIs).

Most developers seem to use Streams to simply transform collections , but they are capable of so much more..

Most developers use Streams as a collections API — if this is you, congratulations! Our article on Lazy extended collections may just make your day https://medium.com/@johnmcclean/faster-lazy-extended-powerful-jdk-collections-5a519ab582ae#.8jjimdasd
A totally lazy pipeline for processing a potentially infinite stream of data is an incredibly powerful data structure and too see just how powerful, let’s do what everyone does seems to do in blogs these days — let’s build a microservice.

What we need our microservice to do

  1. Read data from Amazon’s S3 storage service
  2. Transform it to another form
  3. Write the transformed data back into AWS S3

We could start by defining a Stream like so..

Now we have a Stream that will run infinitely an continuously, but there are many cases when this is not what we want. Do we want to hammer our data service continuously or use some time based function to control how often it is queried?

Let’s schedule the Stream to run every 60 seconds. We can use StreamUtils in cyclops-react to do this

Missing feature 1 : Scheduling

Other useful operators here include onePer, xPer and scheduling based on cron expressions.

A problem we will quickly run into though, is the behaviour when an exception occurs. The method signature of pollS3 probably looks something like this

public Data pollS3() throws AmazonServiceException

If a Stream throws an exception the execution of the entire Stream fails, and in this case our scheduled Stream will stop running.

A Stream that throws an Exception? This won’t end well!
Kaboom! When we execute the Stream that throws an Exception the whole Stream dies.

Missing Feature 2 : Error Recovery!

Again, we can use the recover operator in StreamUtils, it allows us to catch an in-Stream exception and handle it.

Using static extension methods soon get’s unreadable (unless you make use of Lombok’s extension method macro — which can work well in Eclipse but not so well in other IDE’s). A better alternative is to make use of a dedicated extension class. In cyclops-react we’ve created one called ReactiveSeq.

Note that most of the operators discussed in this article are available for direct use on JDK 8 Streams (even those that make use of reactive-streams functionality on the StreamUtils class).

using a dedicated extension class makes your code neater!

ReactiveSeq extends the awesomely powerful Seq type from jooλ. In this instance Seq is short for Sequential. By abandoning the data parallelism of Java 8 Streams it is possible to build a super powerful Sequential Streaming API. jooλ’s Seq type a lot of very powerful extension methods in addition to the 10 missing features we cover today in this article (which are mostly additions by cyclops-react)!

Seq examples

jooλ’s makes manipulating data in memory with Streams much easier, and many of it’s more advanced functions are inspired by the more powerful operators available in SQL (perhaps unsurprising as it comes from Data Geekery the people behind Jooq).

ReactiveSeq extends Seq and also implements the reactive-streams api. It’s goal is to add further extensions so we can work with data that may arrive asynchronously, apply functions that may succeed or fail to that data, connect different processing streams seamlessly, handle processing mismatches between data producers and consumers as well as provide an alternative mechanism for data parallelism while retaining the same rich operator set.

ReactiveSeq extends Seq extends Stream (and reactive-streams Publisher)

Missing Feature 3 : Retries

We have already seen the recover operator, that allows us to catch and Exception and recover from it. In our Microservice, once we have downloaded and processed the data — it would be a real shame to drop it, just because we couldn’t save it at the final stage.

We can make sure that we retry on failure using the retry operator.

the Retry operator will retry an operation if it fails

Note we could use a more detailed overloaded retry operator to specify backoff parameters and the total number of retries we would like.

Missing Feature 4 : Error events

The reactive-streams api allows consumers to listen to various events generated during event processing — such as to the Stream of element data, or to the Stream of errors and to recieve a completion event. With standard Java 8 Streams we only get the Stream of element data — any error caused the termination of the Stream. With ReactiveSeq we can use forEachWithError to subscribe to both data and error events (see StreamUtils#forEachWithError to subscribe to this events with standard JDK 8 Streams).

What does connect do here? Read on to find out!

Missing Feature 5 : Hot Streams

Hot Streams are Streams that are currently executing that we can connect to (that’s what the connect operator does!). scheduleFixedDelay creates a HotStream, every 60 seconds in our example that Stream will emit data, and we can listen to those emissions by connecting other Streams (as many as we like!). In the midst of heavy processing within a service we may like to further process a small sample of records, perhaps to log some useful metrics or to feed a downstream process. Hot Streams are perfect for this.

In this example we trigger a HotSteam executing on a separate thread via the hotStream operator, and create a new connected Stream via connect().

The debounce operator says that we only want to process x amount of emitted items within the specified time period. In this case we will print out one data point per day.

The hotStream operator allows us to create an asynchronously executing Stream — but it is not the only way to do this.

Missing Feature 6 : Asynchronous execution

The futureOperations operator on ReactiveSeq allows users to specify an Executor (perhaps an ExecutorService that manages a Thread Pool) on which the Stream will be executed. Once a terminal operation is called the user recieves a CompletableFuture as the result. The current Thread can continue operating unhindered.

With a more powerful Sequential Streaming API we have an alternative mechnism for parallel Streaming. For CPU bound tasks we can distribute Sequential Streams across threads (ideally one-per-CPU core)

With ReactiveSeq we can define a Stream that executes asynchronously like so..

And can distribute it across threads in a for loop

Executing our parallized Streams will look something like this

We are not limited to executing our Streams asynchronously on a thread pool, we could also pass them to a Vert.x event loop for execution (as long as we can conform to the java.util.Executor interface definition we are good).

Missing Feature 8 : Lazy terminal operations

While most of the Java 8 Stream pipeline is defined lazily, terminal operations typically are not. In fact they are the trigger to execute the pipeline. There are cases, typically when the Stream pipeline executes some expensive function, where it would be better to trigger the evaluation after we know the data is needed.

Because Streams are traverse once, it may not be a good idea to pass the Stream itself around either as a method return type or wrapped inside a Supplier. The lazyOperations operator in cyclops-react allows us to define lazily executed terminal operations. The return type is an memoized (cached) Eval which represents a lazy evaluation.

We can call get on Eval as many times as we like, the Stream will only ever be executed once!

Missing Feature 9 : Pushing data into Streams

Java 8 Streams are great for working with data we have already, but they would be so much more useful if there was a way we could asynchronously supply them with data. In cyclops-react we have created StreamSource for this very purpose. StreamSource allows us to push data into a Stream on separate data producing threads from the thread on which the Stream is being executed.

We can use StreamSource to generate and manage pushable JDK Streams, ReactiveSeq’s and even powerful FutureStreams (see Missing Feature 10), and much like distributed messaging services StreamSource can behave as either a queue of information (pushing data to exactly one Stream) or a topic (pushing data to multiple connected listening Streams).

Missing Feature 10 : Powerful parallelism

Java 8 Streams offer a reduced set of operators in order to support data parallelism, and we have seen that it is possible to build a very rich and powerful sequential API. There are also many ways that we could build parallel Streams, and having built an awesomely powerful sequential API it would be great to be able to keep that power but in a parellel Stream. In cyclops-react we introduce the concept of a FutureStream.

FutureStreams are an API that manage a Stream of Future tasks. Our LazyFutureStream type extends ReactiveSeq, and so has all the same operators (and even adds some too) but layers in parallelism on top of that.

A FutureStream manages a Stream of Futures

Building a Stream of Futures is pretty straightforward to start with but can soon get gnarly. Inside our map and flatMap methods we can just delegate to the equivalent method on the Future we are managing, but things start to get harder when we try to implement filtering (never mind some of the more powerful operators avaiable on ReactiveSeq).

Building your own Stream of Futures (Don’t do this!)

We can define a FutureStream as if it were a normal JDK Stream (with a lot of additional powerful operators)

and have it execute in parallel.

Summary

We could go on and add more missing features, we haven’t mentioned batching / sliding windows, covered insertion and deletion, advanced take / drop operators, zipping or subscriptions and much more besides. But I hope that I’ve been able to convince that the design of Stream API although limited opens up a world of extension opportunities that libaries such as cyclops-react extending jooλ and independently StreamEx have tackled.

In cyclops-react we’ve gone deep on Stream processing because it allows us to leverage an API and coding style familar to Java 8 developers more consistently throughout our applications. The functional style introduced in the Stream API typically results in apps that are easier to test, with less moving parts that are also inherently more parallelizable.

We’ve created datastructures such as async.Queues and Topics specifically to help plumb Streams together to realize that aim.

Queues & Topics can be used to connect Streams

Something that might seem relatively simple compared to other Stream implementations, ultimately turns out to be something incredibly useful and powerful.

For more info checkout cyclops-react.io and the user guide