Java Advanced Concurrency Interview Questions.

Vikas Taank
7 min readJan 6, 2024

--

We did very elaborated articles on completable future and the gradual evolution of Java asynchronous programming that emerged with the need of multi core processors and with the advent of parallel processing. We saw that if we could use the correct thread pool size we could design a parallel processing framework that is able to harness the system resources most efficiently. We saw that how can we compose completable futures to collect the results and to create the data pipelines. In this article we would take a look at some of the use cases where we apply completable futures.

How do you make concurrent API calls in Java?

Answer:

There are times when we would want to call different API’s and combine the results of those two API. This is the most common use case when you try to optimize on the multi cores in the system to perform parallel processing and these calls are independent of each other.

CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> 
callApi("get_inventory_from_store1"));
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() ->
callApi("get_inventory_from_store2"));

// Combine results of both API calls
CompletableFuture<String> combinedFuture = future1.thenCombine(future2, (response1, response2) -> response1 + ", " + response2);
combinedFuture.thenAccept(System.out::println);

How do you process data in an asynchronous manner in Java?

Answer:

There are processes where in you want that the main thread should continue working further and delegate the data processing to an Asynchronous thread. This could be very well achieved with Completable Future.

CompletableFuture.supplyAsync(() -> fetchDataFromDatabase())
.thenApply(data -> processData(data))
.thenAccept(processedData -> System.out.println("Processed Data: " + processedData));

How do you combine Combining Results from Multiple Sources in Java using completable future?

Answer: If you are calling two API’s concurrently , you can use thenCombine method of Completable future to achieve that.

CompletableFuture<Double> future1 = CompletableFuture.supplyAsync(() -> getService1Data());
CompletableFuture<Double> future2 = CompletableFuture.supplyAsync(() -> getService2Data());

future1.thenCombine(future2, (result1, result2) -> result1 + result2)
.thenAccept(result -> System.out.println("Combined Result: " + result));

How do you implement timeout using completable future.

Answer: There are some asynchronous processes which are long running and many a times you want to provide some time out and this isn how you can implement time out as below.

 CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> 
database_fetch())
.completeOnTimeout("Default Value", 1, TimeUnit.SECONDS);

future.thenAccept(System.out::println);

How to Implement Non Blocking computation using Completable Future.

Answer:

Completable future is designed to provide a construct to run non blocking computations in a separate thread so that the main thread is not blocked.

CompletableFuture.supplyAsync(() -> heavyComputation())
.thenAccept(result -> System.out.println("Computation Result: " + result));

Could you provide an example of Parallel Data Processing:

Answer: Below is an example. We want to calculate the square of numbers given as a list. We’ll spawn completable future in order to accomplish that.

Then we join the completable futures and collect those in a list.

package concurrency;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

public class ParallelProcessing {

public static void main(String args[]) {
long startTime = System.nanoTime();
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
List<CompletableFuture<Integer>> futures = data.stream()
.map(num -> CompletableFuture.supplyAsync(() -> {
// Print the thread name here
System.out.println("Thread name: " + Thread.currentThread().getName() + ", processing number: " + num);
return num * num;
}))
.collect(Collectors.toList());

CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenApply(v -> futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()))
.thenAccept(System.out::println);

// Waiting for all futures to complete
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
long endTime = System.nanoTime();
long duration = (endTime - startTime) / 1_000_000; // Convert to milliseconds
System.out.println("Execution Time: " + duration + " ms");

}
}

Can you rewrite the above parallel processing using thenAcceptAsync and explain the difference?

package concurrency;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

public class ParallelProcessingAcceptAsync {

public static void main(String[] args) {
long startTime = System.nanoTime();
ExecutorService executor = Executors.newFixedThreadPool(5);
List<Integer> data = Arrays.asList(1, 2, 3, 4, 5);
List<CompletableFuture<Integer>> futures = data.stream()
.map(num -> CompletableFuture.supplyAsync(() -> {
System.out.println("Computation Thread: " + Thread.currentThread().getName() + ", processing number: " + num);
return num * num;
}, executor))
.collect(Collectors.toList());

CompletableFuture<Void> allFutures = CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenAcceptAsync(v -> {
System.out.println("Result processing Thread: " + Thread.currentThread().getName());
futures.stream()
.map(CompletableFuture::join)
.forEach(System.out::println);
}, executor);

allFutures.join(); // Wait for all futures including the result processing to complete
executor.shutdown(); // Shut down the executor
long endTime = System.nanoTime();
long duration = (endTime - startTime) / 1_000_000; // Convert to milliseconds
System.out.println("Execution Time: " + duration + " ms");
}
}

Using thenAccept

Thread Utilization: This version processes the result of the futures in the same thread that completes the last future. For Quick and light computations this performs as there is no context switch or additional scheduling overhead.

Execution Time: Potentially faster in scenarios with light processing load and when the results are ready to be processed immediately after computation.

Using thenAcceptAsync

Thread Utilization: This version explicitly uses another thread from the pool to process the results, even if the computation threads are idle. This may result in to a small overhead due to context switching and scheduling.

Execution Time: Might be slightly slower due to the overhead of handling the task asynchronously, especially if the system is under heavy load or the thread pool is busy with other tasks.

How do you aggregate the responses from Different API?

Answer:

package org.example;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.List;
import java.util.random.RandomGenerator;
import java.util.stream.Collectors;

public class ApiAggregator {


public static void main(String[] ar) {


List<String> taskIds = Arrays.asList(new Task("Admin", RandomGenerator.getDefault().toString()).getId(),
new Task("Comercial", RandomGenerator.getDefault().toString()).getId());

List<CompletableFuture<String>> futureList = taskIds.stream()
.map(taskId -> CompletableFuture.supplyAsync(() -> fetchResult(taskId)))
.collect(Collectors.toList());

// Create a combined Future using allOf()
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futureList.toArray(new CompletableFuture[0])
);

// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list
CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v ->
futureList.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);

// Now you can use `allPageContentsFuture` to get the combined results
try {
List<String> results = allPageContentsFuture.get(); // This gets the aggregated results
// Process the combined results
results.forEach(System.out::println);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

}

// Dummy method to simulate fetching result
private static String fetchResult(String taskId) {
// Implement fetching logic
return "Result for task " + taskId;
}
}

How to use parallel Stream with Future to achieve asynchronous API calls?

Answer:

package org.example;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.random.RandomGenerator;
import java.util.stream.Collectors;

public class ApiAggregatorParallel {


public static void main(String[] ar) {


List<String> taskIds = Arrays.asList(new Task("Admin", RandomGenerator.getDefault().toString()).getId(),
new Task("Comercial", RandomGenerator.getDefault().toString()).getId());

List<CompletableFuture<String>> futures = taskIds.parallelStream()
.map(taskId -> CompletableFuture.supplyAsync(() -> fetchResult(taskId)))
.collect(Collectors.toList());

// Convert List<CompletableFuture<String>> to CompletableFuture<List<String>>
CompletableFuture<List<String>> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
).thenApply(v ->
futures.stream() // Use a regular stream here to join futures
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list
// Use the combined future
try {
List<String> results = allFutures.get(); // Aggregated results
results.forEach(System.out::println);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}


}

// Dummy method to simulate fetching result
private static String fetchResult(String taskId) {
// Implement fetching logic
return "Result for task " + taskId;
}
}

Points to Consider while using parallel Stream.

  1. Parallel Stream: By calling parallelStream() instead of stream(), the tasks are processed in parallel, utilizing multiple threads. This is beneficial for CPU-bound tasks or when tasks are I/O-bound but involve different independent sources (like different external services).
  2. CompletableFuture.supplyAsync: This method still handles the actual execution of each task asynchronously. When used with a parallel stream, you are effectively distributing the initiation of these tasks across multiple threads.
  3. Joining Futures: The joining of futures (CompletableFuture::join) is done using a regular stream (not a parallel stream). This is because join() is a blocking operation and doing this in parallel won't provide any performance benefit. In fact, it could lead to unnecessary context switching and degrade performance.
  4. Thread Pool Consideration: The parallel stream uses the common ForkJoinPool, which is shared across the entire JVM. If you are running other parallel streams or ForkJoin tasks in your application, they will compete for the same pool of threads. For heavy I/O-bound operations, consider using a custom Executor with CompletableFuture.supplyAsync to avoid starving other parts of your application of CPU resources.

How do you use a custom thread pool with completable Future?

Answer: In the below example the supplyAsyc method will make use of the custom thread pool instead of the default ForkJoinPool which can result in performance improvement is certain use cases.

package org.example;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.random.RandomGenerator;
import java.util.stream.Collectors;

public class ApiAggregatorCustomerThreadPool {


public static void main(String[] ar) {

ExecutorService customThreadPool = Executors.newFixedThreadPool(10);

List<String> taskIds = Arrays.asList(new Task("Admin", RandomGenerator.getDefault().toString()).getId(),
new Task("Comercial", RandomGenerator.getDefault().toString()).getId());

List<CompletableFuture<String>> futureList = taskIds.stream()
.map(taskId -> CompletableFuture.supplyAsync(() -> fetchResult(taskId),customThreadPool))
.collect(Collectors.toList());

// Create a combined Future using allOf()
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futureList.toArray(new CompletableFuture[0])
);

// When all the Futures are completed, call `future.join()` to get their results and collect the results in a list
CompletableFuture<List<String>> allPageContentsFuture = allFutures.thenApply(v ->
futureList.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);


// Now you can use `allPageContentsFuture` to get the combined results
try {
List<String> results = allPageContentsFuture.get(); // This gets the aggregated results
// Process the combined results
results.forEach(System.out::println);
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}

customThreadPool.shutdown();

}

// Dummy method to simulate fetching result
private static String fetchResult(String taskId) {
// Implement fetching logic
return "Result for task " + taskId;
}
}

I have struggled a lot to understand differnt aspects of Completble future and I started writing code in order to undersatnd this fully that led me to write this content.

  1. In summary Completable Future is a way of composing Asynchronous tasks to run in a different thread.
  2. When you call concurrent API’s using Completable Future , your run time is the longest run time of the longest running API.
  3. You could Supply the Executor to the supplyAsync method and this will make the Completable Future to use the Thread Pool provided by you instead of using the ForkJoinPool.
  4. If your Tasks are fast finishing there is an advantage of using thenAcceptAsync to do further processing in different thread otherwise you may end up in the quagmire of context switching among threads.
  5. It is always advisable that you don’t use a Blocking operation except while collecting results from the future.
  6. Parallel Streams are better when you want to process independent tasks which are mutually exclusive.

Please share and clap if you liked my content. Thanks for reading my content.

--

--

Vikas Taank

I am new to Medium, trying to articulate my learnings so far . Please Join medium to read my articles. Please support- https://ko-fi.com/vikastaank