Parallel and asynchronous programming with Java

Shivanshu Goyal
Nerd For Tech
Published in
5 min readAug 3, 2020

Java is a nice language that offers sequential, parallel, and asynchronous programming by creating lightweight processes (known as Threads) programmatically. It helps us to write an efficient program to achieve something.

Let’s first understand what are these 3 ways of a program?

  1. Sequential program: A list of N tasks is given, the program picks each of those tasks sequentially and performs some action. Suppose each task takes 1 second to complete, then our sequential program will take N seconds to complete.
  2. Parallel program: We are not happy with the sequential program and we want to improve its performance, here parallel program employs threads to divide the list of tasks and threads start working on these tasks parallelly to complete in lesser time. Here parallel program is waiting for each thread to complete their task.
  3. Asynchronous program: It also utilizes threads to complete the tasks in lesser time like the parallel programs. But, there is a difference as it does not wait for the tasks list to complete, tasks are getting done asynchronously and the program is busy doing other stuff.

Java 1 had threads to do parallel programming. This thread class has a method void run() that does not take anything as well as does not return anything. This brings mutability in the code. The developer needs to take care of creating threads to divide his task to have a better performance. After a few years, Java 5 came up with ExecutorService which manages thread pool internally and offers an abstracted way to write a parallel program. It returns the Future object. But there is a catch, as soon as we apply the get operation on the future object, the future object is no longer future, it blocks the execution right there. So, we need something else to have a pure asynchronous program in Java. And, here we go. Java 8 brings in CompletableFuture to achieve the same thing. Parallel and asynchronous programming can be achieved using parallelStream() and CompletableFuture.

These concepts are based on Functional programming. If you are not aware of what is functional programming? I would recommend referring this article to understand functional programming first.

Under the hood they use the same thread pool with the number equals to Runtime.getRuntime().availableProcessors(). Let’s understand the difference between parallelStream and CompletableFuture:

Best way to determine the number of threads to be created to have best performance.

if the task is CPU intensive, then # of threads should be equal to and less than # of cores.

if the task is I/O intensive, then # of threads should be equal to and less than (# of cores / 1 — blockingfactor) where 0 ≤ blockingfactor < 1

Number of Core of a machine : Runtime.getRuntime().availableProcessors()

ForkJoinPool.commonPool() returns parallelism value which is # of cores -1 as one is main thread itself

How to use parallelStream in Java?

How to use CompletableFuture in Java?

A completableFuture can have one of these stages:

  1. Resolved (it's a final stage, once achieved, cannot be modified later)
  2. Rejected or Errored out (It’s also a final stage)
  3. Pending (still waiting to go into one of the first 2 stages)

Using get() on a CompletableFuture is a bad idea as it blocks the further execution of the program.

thenApply(), thenAccept(), thenRun(), etc are the methods which get executed on resolved stage of CompletableFuture.

CompletableFuture<T> is immortal. they never die.

The very first exceptionally() method in the entire flow gets invoked in case of the rejection stage of CompletableFuture. An exception can be handled in 2 ways:

  1. Blow up the exception: It stays in the error channel only.
  2. Log the exception and returns the default value to come back to the data channel.

the complete() method can be used to set the value returned by get() and related methods to the given value, If not already completed. It returns true when this invocation caused this CompletableFuture to transition to a completed state, otherwise false.

Let’s understand complete() using the below examples:

What is the difference between future1 and future2?

With the statement future1.complete(20); we are putting future1 into the resolved stage with future1.get() value to 20. Then, we are applying future1.thenAccept(System.out::println); which prints 20 in the terminal.

With the statement future2.complete(20); we are putting future2 into the resolved stage with future2.get() value to 20. Then, we are applying future2.thenApply(data -> data * 2) which takes 20 as input and maps it to 40, it is further passed to thenAccept(System.out::println); method which prints 40 to the terminal.

We also do have completeExceptionally(Throwable throwable) which can be used in case of any exception in the pipeline. In the below example, the future is completing with rejected stage with the statement future.completeExceptionally(new RuntimeException(“We are throwing exception”)); It looks for the first exceptionally block and skips other resolved stage blocks like thenApply() Programming::exceptionHandler is handling the exception and recovering from it by returning 0 to the pipeline. After that, thenAccept(System.out::println) consumes the response and prints 0 on the terminal.

We talked about RESOLVED and REJECTED stage so far, What about the PENDING stage? There is a rule that says “we should never execute something in coding without a timeout”. Unfortunately, we didn't have this timeout API in java 8, but java always makes us happy and it introduced the completeOnTimeout() API in java 9. Let’s see how does it work?

This statement future.completeOnTimeout(0, 2, TimeUnit.SECONDS); waits for 2 seconds to complete the future. If it does not get completed by the statement future.complete(compute(10)); future will be completed to the resolved stage with future.get() value to 0.

Now, we have 2 other important things to discuss in CompletableFuture.

  1. Compose (kind of flatMap in streams)
  2. Combine (there is no corresponding API in streams)

Let’s try to understand why these things are available here? I will take an example from javascript to demystify it.

then(e -> function(e)); it always returns a promise

Now Javascript is dynamic-typed language whereas Java is static-typed language. this function(e) could return anything. It could be either any data or promise. if it is data, it is wrapped into a promise and returned, else it gets returned directly. But, this implicit thing is not valid with Java because of return type matters in Java. Let’s see how we make it work with Java.

In streams
When function returns data, we use map()
When function returns stream, we use flatMap()

In CompletableFuture
When function returns data, we use thenApply()
When function returns CompletableFuture<T>, we use thenCompose()

thenCompose() wait for the completableFuture(add() in this case) to complete, then the result is passed to thenAccept() method.

thenCombine() is used to combine the results from 2 completableFutures. It takes 2 parameters, the first one is completableStage and second is BiFunction

I hope it helps in understanding parallel and asynchronous programming with Java. Thanks for reading!

--

--

Shivanshu Goyal
Nerd For Tech

Software Engineer @Salesforce, USA | Ex-Walmart | Ex-Motorola | Ex-Comviva | Ex-Samsung | IIT Dhanbad