Dysfunctional programming in Java 6 : No Locks or Synchronized Blocks

From dysfunctional to functional

John McClean
12 min readNov 11, 2018
Photo by juan pablo rodriguez on Unsplash

Its time to see how adopting principles from functional programming can help us simplify the challenge of implementing concurrent and parallel code in Java. So far we’ve explored how laziness, immutability, functional composition, banning null assignments and throwing exceptions can bring calmness and serenity to our code base. Concurrency produces the most hair-pullingly-frustrating bugs of all.

Let’s start with some really dysfunctional imperative code

Mutable non-volatile Fields

Given a number of non-volatile, non-final fields

Imperative data processing

We then have a multi-threaded processData method that populates the List of Results and sets the boolean completion flag.

There are at least 6 areas where this code has serious bugs now (or could develop them in the future). In fact calling this method with the code below, results in an infinite loop on my MacBook Pro.

locking.processData(in);
while(!locking.isCompleted()){}

Let’s investigate why.

  1. If an Exception is thrown while we are working with the lock, the lock will never be released
  2. After we have exposed the local Results by adding them into the field resultList we continue to process them — opening up the possibility of a race condition should another thread also try to read or write to those same Result objects
  3. Incrementing complete may suffer from thread visibility problems and / or race conditions with other threads
  4. Comparing expect and complete may suffer from thread visibility problems and / or race conditions with other threads
  5. Setting complete to true may may suffer from thread visibility problems
  6. Incrementing expect may suffer from thread visibility problems and / or race conditions with other threads (where expect is read on a separate thread before being incremented).

Growing imperative, mutable, multi-threaded code is hard

All of these problem areas come from our decision to share mutable state across threads and mediate access to and changes to that state via locking. ⛔️Don’t do this⛔️ We could exert some mental energy and clean up the holes in this code — but that isn’t the right approach. The challenge here, is that even if we fix it (for now) in this small part of the code, we will likely have client code (now or in the future) that reads this data, and perhaps performs further processing on it. The increase in interactions, across threads, on mutable data structures will increase the complexity and difficulty in getting locking correct exponentially. Even if we get it right, and with reasonable performance now, the code will have to be maintained in to the future. New team members with perhaps less expertise in our code base and in the nature of those interactions (or ourselves after working on something else for a couple of months) will find it easy to introduce bugs and / or performance problems.

Photo by Yash Raut on Unsplash

Avoiding mutating state

We should refactor our application so that, we are not updating a mutable List as results arrive. Instead, each task should return its own list containing partial results. We can then aggregate those lists safely and performantly (using some functional apis). All of our state will become local, our methods will return immutable values.

This means we can remove all our current fields, perhaps replacing them only with an Executor on which our new asynchronous tasks should run. Let’s see why

  1. We don’t need List<Result> resultList as we are not going to have shared mutable state
  2. We don’t need to track expected or complete threads
  3. We don’t need a boolean value to track if the entire process is complete
  4. With no mutable state, we have no need for a Lock.
Refactor our fields so there is no mutable state

Welcome to the Future

The method signature of our processData method can look something like this :-

That is at some point in the future a List of Result Objects will be returned. It will be returned asynchronously so it won’t block any code that calls processData.

Future in this case is Future from cyclops (cyclops.control.Future) which provides a more standard functional API over CompletableFuture. It includes methods such as isDone (which we could use to check if the asynchronous task is complete) and options for chaining additional tasks functionally such as map.

Implementing processData

We can refactor our for loop, in particular we can replace the Thread creation code with Future creation.

Thread::start and Thead::join have void return types, which means that neither offer a mechanism to return results back the the original calling thread of execution. The ability to work with and share results across threads is a core feature and benefit of Future.

We can create a Future that loadsResults from disk asynchronously

Executor ex;
Future.of(() -> loadResults(id, data.get(id)), ex);

That will give us a skeleton implementation to work with :-

Doing additional work on the same thread

In the original imperative version once we had loaded up the data on a new Thread, we continued to process it (on the same Thread) making some changes to the mutable Result Objects. We can use the map operator to continue processing our data inside the Future on the same thread.

Using map to continue processing

In this case we are following lambda best practices of making use of a single line expression — a method reference to a processResults method. The implementation of which (making use of Lombok @Wither annotation instead of @Setter) should look something like this :-

Sequence : the swiss arm knife of functional programming

We can collect all of our Future instances inside a List (with a scary verbose type signature 🙀 — a list of futures each with a list of results)

The return type of our method really should be something more like a single Future value with a List of Results (not a List of Futures each with their own result)

It turns out that this is a very common problem in Functional Programming, and luckily there is a very handy pattern that can be used to solve it sequence.

Sequence is a method that takes a List of Futures and converts it into a Future with a List (with the pattern being generalizable to a wide range of functional container types). Sequence will operate asynchronously, the new Future will collate the results of each of the sequenced Futures for us, in an entirely non-blocking manner. So the code to convert our list of Futures (allTasks) into a single Future with a list of Results should look something like this

NB: We perform a concatMap operation to flatten out the List of Lists that Sequence will return in this case (as each individual Future returns a List).

Putting it all together our code is still a little too imperative and more verbose than it needs to be, but the problem of shared mutable state (and associated locks and potential bugs) has been eradicated.

If we move the call to loadResults from disk into the processResults method and change the return types from List to cyclops powerful Stream type ReactiveSeq we can reduce the code required

Future creation can now be simplified to just a call to the process method

Future.of(() -> this.process(e.getKey(), e.getValue()), ex)

We can replace the imperative for loop with ReactiveSeq and a map operation

If we plop this inside our Future::sequence call, we could express the whole method in a single functional expression

Which is certainly a lot simpler, cleaner and more brief than the imperative mess we started out with. You may prefer to break this out a little, and if you do I highly recommend making use of Java 10 local type inferencing support via var to stay out of generics hell!

var can simplify highly generic code!

Continuing processing and the SOLID Principles

A reasonable ask from a performance point of view would be to continue processing results as they come in. Using the map method on the Future returned from processData won’t help, as map will not get called until all the results are available.

One way to implement support for this would be to dig deep into our existing process method and add any additional steps required to that method. ⛔️Stop — don’t do this⛔️. Adding code to an existing method like this is not the best way forward, at a minimum this violates the Open Closed Principle from the SOLID Principles. Wikipedia states

In object-oriented programming, the open/closed principle states “software entities (classes, modules, functions, etc.) should be open for extension, but closed for modification”; that is, such an entity can allow its behaviour to be extended without modifying its source code.

Modifying the process method means we are relying on future modification (not closing it off) — with all the attendant risk of introducing bugs. Other risks here are that by adding continually to process we will wind up mixing high level abstractions with low level abstractions in our code, out of neccessity, potentially violating the Dependency Inversion Principle. In addition adding code in such a fashion risks violating the design principle of Single Responsibility. None of these things are desirable.

Asynchronous Streaming

There is a better way. If we require the ability to continue to asynchronously process the data as it arrives is to make that data available as a continuous, asynchronous (reactive!) Stream (not a discrete scalar result). We can refactor (simplify!) the return type of processData to be just ReactiveSeq<Data>

We have a number of options to do this in cyclops. Currently processData is implemented by creating and the sequencing a Stream of Futures. The mdoule cyclops-futurestream provides an implementation of ReactiveSeq that is exactly that (using a custom FastFuture implementation under the hood). FutureStream will allow us to tee up additional operations to be applied to each Result as it arrives asynchronously.

FutureStream : a simpler & faster implementation

The implementation with FutureStream is both more powerful and much more simple!

Now when after calling processData, we can tee up additional operations via map, peek, flatMap etc and they will be applied appropriately as the data streams in.

If we write some simple example code that calls processData we can get a feel for future-streams in action.

Executing the code

  1. Create a test fixture Map with 10 entries to process asynchronously
  2. Work with the returned FutureStream to capture the current thread processing is happening on
  3. Asynchronously printout the results
example code for working with our processData method

If we run the code above, we will get output something like this :-

Each of the records will be loaded asynchronously on a separate thread (Threads 15–24 above), but once loaded we can continue processing on that same thread!

The map operator tees up additional transformations that are performed by each future in the stream autonomously on it’s own thread.

reactive-streams

It is possible to configure reactive-streams implementations to execute concurrently (typically via a flatMap-style operator for Publishers). By executing the user supplied input function multiple times we can spawn into action multiple asynchronous publishers at the same time. In the cyclops world we do this via the mergeMap operator

In the example above we are telling cyclops to call our Future (a reactive-streams Publisher!) generating function a maximum of 10 times, before any of the returned Futures generate their onComplete call. In practice this would mean that 10 Future instances would be created to load and process the data asynchronously.

The complete method using cyclops’ reactive-streams support would look like this :-

When demand is signalled, mergeMap will execute the supplied Function that will create a new Future which runs asynchronously on a Thread pool. Future is also a reactive-streams publisher and mergeMap will subscribe, when the Future has loaded the data and done it’s initial processing it will push those results down into our connected Stream.

A different kind of concurrency when executing the code

If we run the example code above, we will get a very different result (😳)! This may come as a surprise (or not to some), when we continue to process our Stream and print out the current thread we will tend to get the same single thread that does the additional processing. While mergeMap triggers concurrent processing, continued processing beyond that is single-threaded.

Our output will look something like this :-

We are printing out the thread within an operator that is asynchronously processing the results of the concurrent futures (in a single thread way).

Our process diagram for the map operator when using reactive-streams looks more like this :-

If we add a peek operator call to our Future that prints out the current thread we can see this in action.

When we run our example code, we see the Futures operating concurrently using different threads and additional processing happening on a single

Working with Reactor

If you would prefer to plugin the operators from Project Reactor, we can switch ReactiveSeq implementations to FluxReactiveSeq

In fact if you decide just want to use Flux, cyclops types support excellent conversion and integration with reactive-streams Publishers (and Reactor / RxJava 2 in particular).

How to choose between FutureStreams and ReactiveStreams?

We refactored from Futures to a form of Stream in this example, because we had largeish result sets and wanted to process them as soon as they were available (Futures are perfect for when you have a single result, or at least don’t want to break and immediately process chunks of it). Which Stream type we’d use in practice would depend on what additional work was required.

If you are doing additional I/O bound work, particularly when working with typical Java blocking APIs — future-streams offer good simplicity of implementation mixed with decent performance. On the other hand if most additional processing is CPU bound using Spouts / reactive-streams for additional processing on a single core may give you a performance boost (at the cost of perhaps a slightly more complex implementation).

The code

The code below is for the future-stream version which is the shortest and the simplest implementation that offers good performance for tasks that involve blocking I/O. All of the functional approaches greatly simplify the challenge of building highly concurrent applications, this is true advantage of reactive!

There is also gist with an implementation with all of the approaches included as separate methods.

Summary

Once you are aware of the knack / trick required to refactor to the functional style of concurrency it becomes second nature with practice. The key is use Data Structures (Future ftw ✅) or Dataflows (futurestreams and reactive-streams ✅) that receive results asynchronously and manage the passing of data across threads for you. 🚫Never share mutable state across threads yourself🚫. Start by working with Futures (or CompletableFutures) and then work your way up to some form of Streams. FutureStreams are conceptually Streams of Futures and are extremely useful for working with typically Java APIs which still block when performing I/O. With advanced modern APIs such a RDBC and Reactive HTTP clients, plugin a reactive-streams implementation to get better responsiveness and performance.

--

--

John McClean

Architecture @ Verizon Media. Maintainer of Cyclops. Twitter @johnmcclean_ie