What Is Structured Concurrency ? — Java 21

Phoenix Rising
8 min readFeb 4, 2024

--

Photo credit Elif Dörtdoğan pexels.com

Along with virtual threads, there has been some buzz around the structured concurrency preview feature in Java 21. I believe that to understand structured concurrency, we first need to get our heads wrapped around unstructured concurrency.

What is Unstructured Concurrency?

When we think of concurrent programming, the first thing that we need to realise is that in most cases we have the concept of tasks and sub tasks. The main thread could be the parent task which can for example create sub tasks to executes parts of the bigger task. Maybe it is better to visualise this with an example. Consider the class below:


import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

public class UnstructuredConcurrencySimulation {


private ExecutorService executorService;

public UnstructuredConcurrencySimulation() {

}

// orchestrator method that executes the sub tasks
public String simulatePartialThreadLeak() throws ExecutionException, InterruptedException {
Future<String> taskA = executorService.submit(this::taskThatErrorsOut);
Future<String> taskB = executorService.submit(this::justAnotherLongRunningTask);
var responseOfTaskA = taskA.get(); // blocking for response
var responseOfTaskB = taskB.get();
return STR."\{responseOfTaskA} \{responseOfTaskB}";
}

// A sub task that always fails
private String taskThatErrorsOut() throws InterruptedException {
Thread.sleep(5000);
throw new RuntimeException("task fails and we do not know why!");
}


// A long running taks that eventually succeeds
private String justAnotherLongRunningTask() throws InterruptedException {
int i = 0;
// Hope your production code does not look like this
while (i < 10) {
Thread.sleep(2000);
System.out.println("Doing fancy stuff ...... ");
i++;
}
System.out.println("Where is mama?");
return "Task completed successfully";
}

public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}

// The trigger for the concurrent tasks
public static void main(String[] args) throws ExecutionException, InterruptedException {
var executorService = java.util.concurrent
.Executors.newFixedThreadPool(2);
UnstructuredConcurrencySimulation unstructuredConcurrencySimulation =
new UnstructuredConcurrencySimulation();
unstructuredConcurrencySimulation.setExecutorService(executorService);
System.out.println(unstructuredConcurrencySimulation.simulatePartialThreadLeak());
// Dont do this!
executorService.shutdown();
}
}

As the name suggests, this class kind of simulates unstructured concurrency but how?

So what happens, when we execute this code ? We know that one of the tasks will always fail (taskThatErrorsOut) and another one keeps running for quite some time (justAnotherLongRunningTask). So lets give it a shot , shall we ?

Unstructured Concurrency In Action

Hmm, so what happened here ? As you can see the taskThatErrorsOut threw an exception and basically rendered the execution invalid. However the justAnotherLongRunningTask kept on doing its job even though it was practically pointless. This situation is better known as a “Thread Leak” , which is one of the well known consequences of unstructured concurrency.

If you think about it, the main thread can be considered as the parent task (via the simulatePartialThreadLeak handler method) and the taskThatErrorsOut and justAnotherLongRunningTask methods are sub tasks of the main task. However, this relationship only exists in our heads. In reality , as we have already seen from the execution dump, there is no way to enforce this relationship especially if one of the tasks fail. In other words, we are not guaranteed that the sub tasks will return to the same parent task after the execution of the sub tasks are completed. This my friends, is what unstructured concurrency means in a nutshell.

Unstructured concurrency, means that there could be potential wastage of resources (because of thread leaks for e.g) but more importantly it shifts the burden of managing cancellation of tasks and sub tasks on the developer. Would it not be nice , if we could enforce parent child relationships automatically ? This is where structured concurrency comes in. ✨

Structured Concurrency FTW

As the smart people at Open JDK , have stated that the best way to think of structured concurrency is to think about single threaded applications. Interesting! but how ?

When we think about single threaded Java applications , we know that parent child relationship is strictly enforced.

public String parent(){
String s = child1();
int i = child2();
return s + " " + i;
}

private String child1(){
// do string-y stuff
}

private int child2(){
// do int-y stuff
}

In the above example, child1 (or child2) can either return the expected retrun type or maybe throw some exception, but in any case it will always return to the parent() method in usual cases. There will not be a scenario where child1 or child2 methods will outlive the scope of parent() and this is the key concept that is extended in the intent of Structured Concurrency API in java 21.

If a task splits into concurrent subtasks then they all return to the same place, namely the task’s code block.

It is time to take another code example:


import java.util.concurrent.ExecutionException;
import java.util.concurrent.StructuredTaskScope;
import java.util.function.Supplier;

public class StructuredConcurrencySimulation {

public String handleUsingStructuredConcurrencyFailFast() throws InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Supplier<String> respA = scope.fork(this::taskThatErrorsOut);
Supplier<String> respB = scope.fork(this::justAnotherLongRunningTask);
scope.join().throwIfFailed();
return STR."\{respA.get()} \{respB.get()}";
}
}

private String taskThatErrorsOut() throws InterruptedException {
Thread.sleep(5000);
throw new RuntimeException("task fails and we do not know why!");
}


private String justAnotherLongRunningTask() throws InterruptedException {
int i = 0;
while (i < 10) {
Thread.sleep(2000);
System.out.println("Doing fancy stuff ...... ");
i++;
}
System.out.println("Where is mama?");
return "Task completed successfully , I win";
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
StructuredConcurrencySimulation structuredConcurrencySimulation =
new StructuredConcurrencySimulation();
System.out.println(structuredConcurrencySimulation.handleUsingStructuredConcurrencyFailFast());
}

}

We have the same sub task codes namely taskThatErrorsOut and justAnotherLongRunningTask, that we know from before. However, the interesting part is the handler code:

 public String handleUsingStructuredConcurrencyFailFast() 
throws InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Supplier<String> respA = scope.fork(this::taskThatErrorsOut);
Supplier<String> respB = scope.fork(this::justAnotherLongRunningTask);
scope.join().throwIfFailed();
return STR."\{respA.get()} \{respB.get()}";
}
}

A few things are happening here , but first of all it is important to know that StructuredConcurrency is a preview feature in Java 21. So this basically means that you have to enable the preview feature. If you are using IntelliJ , you can just update the java compiler settings, as shown below:

Now the core class that we need to be aware of, is the StructuredTaskScope class. It supports two policies:

  • ShutdownOnFailure — This policy that we also used in our example code, means that if any of the subtasks fail within the scope , then all the subtasks are cancelled and control returns to the parent. You can think of it like a short circuit operation policy.
  • ShutdownOnSuccess — This policy is the counterbalance to the ShutdownOnFailure policy, meaning that if any of the sub tasks succeed then all subtasks are cancelled and control returns to the parent. Now it depends on your use case whether this might be desirable for your application or not.

Back to our example, what happens if we execute the code ? Since we know that taskThatErrorsOut will fail , what will happen to the justAnotherLongRunningTask ? let us find out shall we?

Structured Concurrency In Action

Now this is interesting and kind of confirms the expectations of the ShutdownOnFailure policy. When the taskThatErrorsOut failed , it cancelled the justAnotherLongRunningTask task and the main thread exited properly knowing fully well as to what happened. In effect, the parent task now has more control and knows what happened with the subtasks in case of a failure.

What will happen when we use the ShutdownOnSuccess policy ? Let us introduce that in our example code as shown below:


import java.util.concurrent.ExecutionException;
import java.util.concurrent.StructuredTaskScope;
import java.util.function.Supplier;

public class StructuredConcurrencySimulation {

...

public String handleUsingStructuredConcurrencyReturnAny()
throws InterruptedException, ExecutionException {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<>()) {
scope.fork(this::taskThatErrorsOut);
scope.fork(this::justAnotherLongRunningTask);
return STR."\{scope.join().result()}";
}
}

...

public static void main(String[] args) throws ExecutionException, InterruptedException {
StructuredConcurrencySimulation structuredConcurrencySimulation = new StructuredConcurrencySimulation();

System.out.println(STR."result : \{structuredConcurrencySimulation.handleUsingStructuredConcurrencyReturnAny()}");
}

What happens when we execute our code now? Will the application fail or have some result ? Let us check the result:

Structured Concurrency With ShutdownOnSuccess Policy

Nice! 👍 So now we get the result from the success task (justAnotherLongRunningTask) even though the taskThatErrorsOut is still failing. The parent task is satisfied that any of the subtasks has succeeded and it processes that result.

So to explain simply , when the parent task goes out of scope, there is some sort of guarantee that the child tasks are either complete or cancelled. It is nice to work with such assurances, dont you think?

Why can’t we just use Future.cancel ?

It is important to understand that my idea with the post is not to sell StructuredConcurrency but to make ourselves aware of the feature which will most probably be no longer preview in Java 22.

Having said that , it is indeed possible but tricky to implement shutdown policies using Future.cancel like for example using a paradigm like:

try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
try{
// call sub tasks here
}finally{
// get list of futures and call cancel on each of them
}
}

However, for most experienced Java developers agree that this paradigm is hard to get right and tricky. Therefore, it would be nice if this could be coupled with the concept of scope (as we have seen earlier).

What about ForkJoinPool?

The ForkJoinPool , does indeed provide the means to enforce structured concurrency. However, according to the JDK developers , the API was designed for compute intensive tasks rather than I/O based tasks that StructuredConcurrency is targeting. This is also a good time to say that the examples we have used in this post illustrate the concept via simulated tasks but your real life use-case would probably be more interesting (duh!).

How does structured concurrency work in conjunction with virtual threads?

Ah! the crown jewel of Java 21 — virtual threads , I have spoken about some of the issues of virtual threads in this post:

However, structured concurrency works very well with virtual threads. How is that ? Well imagine spinning off millions of these lightweight threads and then having the framework handling the scope automatically in a clean manner by having a way for all these subtasks to return to their parents. This would become even more amazing when we have complex fan out scenarios (like when sub tasks fork other sub tasks).

What about observability?

I feel this is an important value add because in the pre Structured Concurrency age , it was hard to correlate between parent and sub tasks while troubleshooting (think thread dumps). However, with structured concurrency, we have a way of grouping these relations like in the example shown here. All you need to do is:

jcmd <pid> Thread.dump_to_file -format=json <output_file>

or whatever is your favorite method get thread dumps.

What are the current drawbacks of Structured Concurrency?

Well, we can think of a couple:

  • It is still in preview, so it is not recommended to be used in live production systems.
  • It currently only supports two policies ShutdownOnFailure and ShutdownOnSuccess , though it is possible to create custom policies.

Conclusion

Structured Concurrency is an interesting concept and is inspired from languages like erlang. Whether or not it will find popularity in the Java space, only time will tell but as far as I can see , the prospects are nice.

P.S: Did I miss something ? do let me know your suggestions. 🙏 🙌🚀

--

--