Dysfunctional programming in Java 3 : Functional Composition

A tutorial with examples from dysfunctional to functional

In parts 1 and 2 of Dysfunctional programming in Java we looked at the benefits of laziness and immutability respectively. In this article we will take a deep dive into the benefits of functional composition for our application.

The current state of our DataFileMetadata class, a core entity in our sample application that represents a customer file stored somewhere on disk, is as follows :

It is entirely immutable and the contents are lazily loaded from disk only when the getContents method is first called. Contents, once loaded are cached, so we only load from disk once.

Something we haven’t changed from the original implementation is the return type of the getContents method.

Its still a String. This forces us to evaluate our lazy Eval instance when getContents is called. When I was starting out with functional programming in Java — my gut instinct was to do the same. To deal in the comfort of the familiar Java types and not these strange new structures.

Let’s look at the consequences of this type of decision making :

Transforming data loaded from File

Our application should be able to load the raw text data on file and transform it into domain specific entity Objects. The entities we wish to transform our file contents into look like this :-

Great! We have made them immutable as well (article 2 was not wasted🤞💪🏽)! The code that does the transformation is a little more complex though and still eager and imperative (what could possibly go wrong-😮).

imperative data transformation

Nested for loops

Nested for loops always take a little more effort to understand, especially as the code gets complex. Let’s be thankful there are no indexes or conditional branches in there. — yet (😱)!

Mixed abstraction levels

Although our method is only 16 lines long, it is starting to get hard to follow and discern exactly what the business logic is.

The local mutable state may not seem so bad but it does limit our options.

Can we parallelize our I/O?

Loading the contents from disk is an IO bound operation, but the shared mutable state in our imperative algorithm (highlighted below)makes parallelization difficult.

How about sequential performance?

The loadData method accepts eager data types — a JDK List. Ultimately this means we are loading all of the files from disk into memory inside the loadData method, regardless of whether we will actually process them all.

Let’s refactor to a more functional style

Step 1: Replace the existing control structures with Java 8 Streams.

Yuck (🤮)! It is at best, a start.

How did we score against the problems listed above?

  • Nested for loops ✅ [removed]
  • Mixed abstraction levels 🛑 [still there]
  • Parallelization of I/O tasks am ✅ [better]
  • Lazy loading of data 🛑 [nope]

This implementation removes the nested for loops, and partially helps with parallelization (we could use parallel Streams to load data concurrently — by default on the common ForkJoin Pool, which is not recommended for I/O). Overall the readability is not much improved.

Fixing the challenge of mixed abstraction levels

Step 2 : factor out some of the lower level tasks into their own Functions that we can reference in our flow. We can create two helper functions, the fist should split the contents String out by commas. If we remember the structure of the Data Objects we are trying to create :-

We can reverse engineer what the contents String might look like

"10:hello,20:world"

Or in a more general form

"<customer_id>:<record_data>,<customer_id>:<record_data>..."

The second method should take the Stream of individual lines and build the Record Objects, by splitting on ‘:’ to split beween the customer id and the record data.

We can refactor our Stream code to use these functions :

Even though the resulting method is shorter and it’s intent is more clear, we are still mixing abstraction levels. Let’s strip away the Stream infrastructure code to examine the sequence of verbs (functions) we apply :-

Building Customer Records and Customer related Data Objects, from contents, do seem to be abstractions of a similar level. splitContentsByLine looks like it a could be a sub-function inside buildRecords. Let’s refactor buildRecords (and give it a better name while we are at it) :-

And re-work loadData :

the business logic (right side) is much cleaner — but the Stream related boilerplate (left side). still obscures things!

We can do better than this (clean up the boilerplate), and attempting to solve the remaining two design issues with the original method will improve it further still.

Let’s make it lazy!

We can start by refactoring the method signature of loadData. Cyclops has two data types that look like potential candidates for use, LazySeq and ReactiveSeq.

LazySeq is a lazy LinkedList, it’s very similar to Stream in Scala or Vavr (with the exception that LazySeq in Cyclops in truly fully lazy), or List in Haskell.

ReactiveSeq represents a dataflow. The dataflow can be iterative (that is data can be pulled along it), or it can be reactive (data can be pushed along it asynchronously) following the reactive-streams spec with backpressure or in an Observable-style fashion with no backpressure applied, the flow can be sequential or parallel. Implementations are provided that integrate Project Reactor and RxJava 2 — but a native reactive-streams implementation is bundled in the core library.

Given that we are doing I/O operations that we may like to parallelize inside the loadData method, the more powerful ReactiveSeq type is a better fit in this case.

Getting Cyclops

If you are following along, you can install the cyclops library for functional programming in Java by adding it to our Maven our Gradle class paths

Maven

<dependency>
<groupId>com.oath.cyclops</groupId>
<artifactId>cyclops</artifactId>
<version>10.0.1</version>
</dependency>

Gradle

compile group: 'com.oath.cyclops', name: 'cyclops', version: '10.0.1'

Refactoring the helper method

We can refactor buildCustomerRecords to return a LazySeq instead of an eager List :-

If our Records are stored in a Lazy data-structure we also need to update our entities to use LazySeq

Refactoring DataFileMetadata

Now that we are processing everything lazily, we could perhaps consider returning not the raw eagerly evaluated String for our contents in DataFileMetadata but rather the lazy reference to Eval — which we can chain together inside our functionally composed data flows more cleanly.

This refactoring actually involves removing the override to the Lombok provided getContents() method. By removing it Lombok will automatically create a getter for us that return the Eval reference.

We get a reference to our lazy Eval and then compose further operations on the contents, without ever actually evaluating it (we can defer evaluation entirely until the contents absolutely must be present).

We can use the getContents() method that returns an Eval to tee a set of lazy operations, these won’t be executed until a ‘terminal’ operator on the Eval instance (such as ‘get’) is called.

Putting it all together lazy data loading

When we complete the refactoring our loadData method drops to just 3 lines of code (implemented on a single line). The more judicious choice of functional data structures removes the boilerplate of creating Streams (from the fileInfo input paratemeter, from the Split contents String) and conversion back to lists. Ultimately this makes the business logic

much clearer

The Declarative Advantage

We have refactored the loadData method into a declarative form, where we say what we would like to happen — not how it should happen. With ReactiveSeq we can easily replace the execution model to run the code in different ways (sequentially on a single thread, asynchronously or in parallel for example).

Running the code

If we setup a test DataFileMetadata instance


// "10:hello,20:world"
meta = new DataFileMetadata(10l,"type",file);

Running The Original Imperative / Eager version

And run a transform against the original Eager implementation

System.out.println(transformer.loadData(Arrays.asList(meta1)));

The output, printed to the console is the fully resolved and loaded set of files.

Running Lazily with ReactiveSeq version

If we run with the Lazy version instead, swithing our input data type to a ReactiveSeq instance rather than an ArrayList

System.out.println(transformer.loadData(ReactiveSeq.of(meta)));

The output is instead just a reference to the Lazy ReactiveSeq implementation (in this case the implementation is the Cyclops internal type StreamX indicating that it is a standard extended iterative Stream).

If we refactor our execution code slightly to eagerly resolve the ReactiveSeq dataflow to a List

System.out.println(transformer.loadData(ReactiveSeq.of(meta))
.toList());

The output resolves to something more similar to the eager output.

From Functional to Reactive

One of the nice things about the ReactiveSeq interface is that there are lots of different implementations available that we can plugin. For example — if we would like to push data into the ReactiveSeq asynchronously, with back pressure via the reactive-streams api we can create our ReactiveSeq instance via the Spouts factory class. Spouts.from allows us to construct a ReactiveSeq instance from a reactive-streams Publisher. Let’s create one that connects to a Project Reactor Flux.

Flux<DataFileMetadata> flux =  Flux.just(meta)
.subscribeOn(Schedulers.elastic());
ReactiveSeq<DataFileMetadata> seq = Spouts.from(flux);
System.out.println(transformer.loadData(seq)
.toList());

This time data will be pushed asynchronously from the Reactor Flux instance to the Cyclops ReactiveSeq instance when demand is signalled (triggered by the toList() method).

With exactly the same code — we have entirely flipped our Runtime paradigm from standard pull based iterative execution to push based reactive-streams with back pressure!

When we run this version, converting the result to a List (blocking code) we see the following output.

We can refactor the Flux implementation to be completely Non-Blocking by replacing toList (which returns a JDK List) to a method that returns a List embedded inside the asynchronous ReactiveSeq type (in this case we will use collectAll — a non-blocking, non-terminal operation).

Running this code will result in the main thread printing out the Stream type, while continuing execution. Note that the Stream type has changed from StreamX (an Iterative type) to ReactiveStreamX (a push based — asynchronous reactive-stream).

Cyclops-Reactor Deep Integration

We can take this further also. If you want to tightly integrate Reactor into your Cyclops projects you can make use of the cyclops-reactor-integration module to get really deep integration between the two libraries.

Getting cyclops-reactor-integration

If you are following along, you can install the cyclops-reactor-integration module by adding it to our Maven our Gradle class paths

Maven

<dependency>
<groupId>com.oath.cyclops</groupId>
<artifactId>cyclops-reactor-integration</artifactId>
<version>10.0.1</version>
</dependency>

Gradle

compile group: 'com.oath.cyclops', name: 'cyclops-reactor-integration', version: '10.0.1'

Deep Flux Integration

If replace the Spouts.from connector in our Reactor example with FluxReactiveSeq.reactiveSeq instead, we no longer connect Reactor’s Flux with Cyclops’ ReactiveSeq, but replace the reactive-streams operators used by Cyclops’ ReactiveSeq with those implemented by Reactor.

Running the code this time, we still don’t block, we print out the Stream type and continue executing. The Stream implementation has changed again, it is no longer ReactiveStreamX but FluxReactiveSeqImpl (an implementation that delegates to the Reactor operators).

Parallel Execution

But there are other Stream types we can connect or embed (for example from RxJava2 we can working with Flowable or Observable types in a connected or deeply embedded manner). Cyclops itself provides another Stream type called a FutureStream.

FutureStreams

A FutureStream in Cyclops is a Stream of Futures (each an asynchronous autonomous / isolated task). You can scale them out to execute in parallel / concurrently by increasing the number of threads or active tasks.

Getting cyclops-futurestream

If you are following along, you can install the cyclops-futurestream module by adding it to our Maven our Gradle class paths

Maven

<dependency>
<groupId>com.oath.cyclops</groupId>
<artifactId>cyclops-futurestream</artifactId>
<version>10.0.1</version>
</dependency>

Gradle

compile group: 'com.oath.cyclops', name: 'cyclops-futurestream', version: '10.0.1'

Plugging a parallel futurestream

We can create a parallel futurestream builder that supports the creation of parallel futurestreams with 10 threads and 10 active autonomous tasks as follows

LazyReact builder = new LazyReact(10,10);
System.out.println(transformer.loadData(builder.of(meta)));

Running the code we see yet another Stream type, this time FutureStreamImpl which indicates that this ReactiveSeq is a parallely executing Stream of Futures that asynchronously push their results through the pipeline to the next task.

Non-blocking collections

With another cyclops module we can generate raw JDK (and alternatively immutable / persistent collections) asynchronously and pass them to third party APIs where they will be populated without blocking the calling code until an access attempt is made.

Expanding our Reactor example, rather than using collectAll to collect data into a List inside our reactive-stream, we can externally populate a raw list asynchronously.

[Functional Concept] Functional Composition

In this article we refactored some imperative code to a more functional style using the advanced Stream type ReactiveSeq from the cyclops library. Rather than imperatively tell the computer what to do inside the loadData method we refactored to a declarative implementation where inside the chained operators of ReactiveSeq we said what steps needed to be performed but nothing about how to perform them.

The inherent flexibility of moving to declarative model is that it allows us to plugin many different implementations — in this case just by changing a line or two of client code we were able to switch from traditional functional iterative models, to asynchronously executing reactive-streams, to parallel based futurestreams — from populating JDK Lists in a blocking fashion, to populating them in a non-blocking manner inside the Stream, to returning unembedded Lists again but populating them asynchronously in a non-blocking manner. Such is the power of functional composition!

When we compose functions together into a single flow, state must be managed by the flow of data itself through the chain of functions. Ultimately that means it doesn’t matter in which direction the data flows through the chain (via push or pull) or how (sequential or in parallel) !

This brings create power, flexibility and maintainability to functional style applications — in short it helps future proof our designs.