#4 Asynchronous Programming in Java-Launching Several Tasks with CompletionStage APIs

anil gola
4 min readJun 24, 2023

--

This blog is in continuation of the previous blog.

https://medium.com/@anil.java.story/approach-to-non-blocking-code-part-3-78cb1671db49

Here we will see how we can launch several tasks with the help of CompletionStage APIs.

Let’s consider our tasks now.

 Supplier<Quotation> fetchQuotationA = () -> {
try {
Thread.sleep(RandomUtils.nextInt(450,500));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return new Quotation(RandomUtils.nextInt(1,100),"server-A");
};

Supplier<Quotation> fetchQuotationB = () -> {
try {
Thread.sleep(RandomUtils.nextInt(450,500));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return new Quotation(RandomUtils.nextInt(1,100),"server-B");
};

Supplier<Quotation> fetchQuotationC = () -> {
try {
Thread.sleep(RandomUtils.nextInt(450,500));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return new Quotation(RandomUtils.nextInt(1,100),"server-C");
};

Supplier<Quotation> fetchQuotationD = () -> {
try {
Thread.sleep(RandomUtils.nextInt(450,500));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return new Quotation(RandomUtils.nextInt(1,100),"server-D");
};

Supplier<Quotation> fetchQuotationE = () -> {
try {
Thread.sleep(RandomUtils.nextInt(450,500));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return new Quotation(RandomUtils.nextInt(1,100),"server-E");
};

Instead of Callables, we now have Suppliers. Suppliers are just like Callables; it’s just that the Supplier doesn’t declare any exceptions. No checked exception is thrown by the Supplier, which is not the case with Callables. And, by the way, this is better.

As in my previous blogs, we will collect these suppliers in an ArrayList, just like below.

var taskList = List.of(fetchQuotationA,
fetchQuotationB,
fetchQuotationC,
fetchQuotationD,
fetchQuotationE );

Now, we can submit all these Suppliers to our CompletableFutures as we did in ExecutorService, just like below.

var futures = new ArrayList<CompletableFuture<Quotation>>();
for(Supplier<Quotation> task : taskList){
var future = CompletableFuture.supplyAsync(task);
futures.add(future);
}

CompletableFuture.supplyAsync immediately returns a CompletableFuture. It’s the same as Future, but with a lot more features. In fact, CompletableFuture implements both the Future and CompletionStage interfaces.

Now, we need to collect the result of all the futures in an ArrayList, something like below.

List<Quotation> quotations = new ArrayList<Quotation>();

To do that, we need to iterate over the futures array list.

List<CompletableFuture<Void>> voids = new ArrayList<>();
for(CompletableFuture<Quotation> completableFuture : futures){
CompletableFuture<Void> result =
completableFuture.thenAccept(quotation -> quotations.add(quotation));
voids.add(result);
}

CompletableFuture has a method called thenAccept, which accepts a Consumer. This Consumer is executed when the previous task is completed. Of course, both tasks(previous and the Consumer task) are executed asynchronously in other threads. By default, tasks are executed by threads from the ForkJoin pool. Also, thenAccept returns immediately with a CompleteFuture. Since it accepts a Consumer, CompletableFuture is of type Void.

Now, we need to do the following to make sure that our main thread doesn’t exit too early. If it exits early, then our tasks, which are running asynchronously in some other threads, will not get a chance to complete, and we will see nothing in our console. join is similar to get method in the future; it’s just that it doesn’t throw any checked exceptions.

Also, the below line is not part of my application

for(CompletableFuture<Void> r : voids){
r.join(); // blocking , but this is not a part of my application
}

One more thing I need to point out is that since tasks are being executed asynchronously in some other threads, we have race conditions on the below line

List<Quotation> quotations = new ArrayList<Quotation>();

The above collection is not concurrent-aware. So let’s make the above-line thread safe by using some concurrent-aware collections.

Collection<CompletableFuture<Void>> voids = new ConcurrentLinkedDeque<>();

Now, we are thread-safe. We have a quotation collection, and now we can find the best quotation, as we did in our previous blogs, with the following code:

var bestQuotation =  quotations.stream()
.min(Comparator.comparing(Quotation::getValue))
.orElseThrow();

Let’s also wrap this in start and end instants, just like below.

var startInstant = Instant.now();
var futures = new ArrayList<CompletableFuture<Quotation>>();
for(Supplier<Quotation> task : taskList){
var future = CompletableFuture.supplyAsync(task);
futures.add(future);
}

Collection<Quotation> quotations = new ConcurrentLinkedDeque<>();
Collection<CompletableFuture<Void>> voids = new ConcurrentLinkedDeque<>();
for(CompletableFuture<Quotation> completableFuture : futures){
var result =
completableFuture.thenAccept(quotation -> quotations.add(quotation));
voids.add(result);
}

for(CompletableFuture<Void> r : voids){
r.join(); // blocking , but this is not a part of my application
}

var bestQuotation = quotations.stream()
.min(Comparator.comparing(Quotation::getValue))
.orElseThrow();


var endInstant = Instant.now();
System.out.println("Asynchronously - Best Quotation is ["+ bestQuotation.value+"]"
+ " Best server is ["+bestQuotation.serverDescription+ "]"
+ " (millis) " + Duration.between(endInstant,startInstant).toMillis());

Let’s run the programme several times and analyse the results.

Asynchronously - Best Quotation is [4] Best server is [server-B] (millis) -508

Asynchronously - Best Quotation is [8] Best server is [server-C] (millis) -497

Asynchronously - Best Quotation is [5] Best server is [server-A] (millis) -508

Asynchronously - Best Quotation is [6] Best server is [server-A] (millis) -487

This code is roughly taking the same amount of time as our executor services. At this point, you may wonder what’s the deal with this. The pattern looks exactly the same. Bear with me; we will discuss how we could chain our tasks and create asynchronous pipelines with these Completion Stage APIs in coming blogs. Stay tuned to this space.

My youtube channel:

Asynchronous Programming : https://youtu.be/AHL2zuZ_5_k

--

--