Asynchronous programming with Java 8
While writing Java program most often than not you might have encountered situations when you need to access more than one resource (such as external APIs, databases, etc..). To make the process faster, the defacto standard is to run the computation in parallel threads and then eventually merge the results. This is what we call as asynchronous programming. But at times, it becomes cumbersome to handle the situation and you end up spending more time writing the boilerplate than the actual logic.
In this blog post, I will cover a few of the scenarios where we need to use the asynchronous power of Java. We will look into different ways of solving the given problem.
Scenario 1: Independent computation with no results:
Suppose we have one process that takes a while to complete its execution. To mimic the behavior in the simplest possible way let’s add sleep to the current thread.
void executeMe(long timeToSleep) throws InterruptedException {
Thread.sleep(timeToSleep);
}
This method does nothing but sleeps for a specified amount of time.
Now let's run this method twice and check the time taken to complete the process.
long timeBeforeStart = System.currentTimeMillis();executeMe(1000);
executeMe(1000);
long timeNow = System.currentTimeMillis();long totalExecutionTime = timeNow - timeBeforeStart;
System.out.println("Execution time " + totalExecutionTime);
It would be more than double the time of a single execution, because of synchronous execution. Now let's see how this can be reduced.
One of the ways is to run these tasks in different threads.
Thread thread1 = new Thread(() -> executeMe(1000));
Thread thread2 = new Thread(() -> executeMe(1000));
thread1.start();
thread2.start();
To be sure that both of these threads complete their execution let’s join the threads.
thread1.join();
thread2.join();
This time, as we are running the execution in different threads it is certainly going to take almost half of the time that it took in the synchronous execution.
Now we will try to achieve the same thing using Java 8 parallel stream.
Stream.of(1, 2).parallel().forEach(i -> executeMe(1000));
In this case, we iterate twice and passed on the computation to the parallel stream. This will abstract out all the boilerplate code and will itself parallelize the process.
There is another way to write the asynchronous execution, which is by using CompletableFuture. it will run the execution in a different thread than the main thread.
CompletableFuture<Void> cf1 = CompletableFuture.runAsync(() -> executeMe(milliseconds));
CompletableFuture<Void> cf2 = CompletableFuture.runAsync(() -> executeMe(milliseconds));
CompletableFuture.allOf(cf1, cf2).join();
In the above example, we have passed on the computation to the ‘runAsync’ method of CompletableFuture. It will run the computation asynchronously and will return the CompletableFuture.
CompletableFuture<Void> cf1=CompletableFuture.runAsync(() -> executeMe(milliseconds));
By running the above line does not necessarily mean that the execution has been completed. Much like java Future, we have to apply, ‘get’ method to complete the process. If the process has not completed yet, it will wait for its completion. On failure, it will throw checked exceptions based on the reason for failure.
If you noticed the return type above, it’s CompletableFuture<Void>(You can compare it with Future of Runnable). ‘runAsync’ just finishes the task at hand and doesn’t return anything.
We can also use ‘join’ instead of ‘get’. It will throw an unchecked exception in case of failure.
Instead of applying ‘join’ on individual CompletableFuture we can apply ‘allOf’ method to combine the CompletableFuture and then ‘join’.
CompletableFuture.allOf(cf1, cf2).join();
In this case, it will take time, which is the maximum time taken by individual computation.
So if ‘cf1’ takes 1500 milliseconds and ‘cf2’ takes 1000, the total time would be 1500 instead of 2500 milliseconds.
Let's move on to the next problem.
Scenario 2: Independent computation which eventually returns the combined result:
In the example above we were not bothered about the result. Now let’s take the scenario where we need the result of both the computation and use the result to perform the other operation.
@Override
public Order createOrder(int userId, int productId) {
Product product = productService.getProduct(productId);
User user = userService.getUser(userId);
if(Objects.nonNull(product) && Objects.nonNull(user)) {
return orderService.addOrder(userId, productId);
}
throw new RuntimeException("Failed to create order");
}
Assume that we are trying to create an order. For creating an order we need both user and product information. If any of these calls fail we can not create order. Hence order is dependent on the success of the previous two calls. So we can not make it parallel to the other calls. However, We can make the ‘getUser’ and ‘getProduct’ call asynchronous because they are independent of each other. on the successful execution of both the calls, we will call the ‘createOrder’.
The first way of achieving this is as follows
public Order createOrder(int userId, int productId) throws InterruptedException {
Callable<Product> productCallable = () -> productService.getProduct(productId);
Callable<User> userCallable = () -> userService.getUser(userId);
ExecutorService executor = Executors.newFixedThreadPool(2);
ArrayList tasks = new ArrayList<Callable<Object>>();
tasks.add(productCallable);
tasks.add(userCallable);
final List<Future<Object>> list = executor.invokeAll(tasks);
executor.shutdown();
try {
Product product = (Product) list.get(0).get();
User user = (User) list.get(1).get();
if (Objects.nonNull(product) && Objects.nonNull(user)) {
return orderService.addOrder(userId, productId);
}
} catch (Exception ignored) {
}
throw new RuntimeException("Failed to process create order");
}
We have chosen Callable over Runnable because of its ability to get us results.
‘invokeAll’ method on executor will call the tasks in parallel and when we call ‘get’ on any of the Future objects it will ensure that all the tasks are completed.
once we get the user and product we can call the ‘creatOrder’.
Let’s see how we can make use of Java 8 parallel stream
The parallel stream needs the iterator object. We can play around with java 8 to fit in our use case.
Let’s create one functional interface
interface Command<R, T> {
R execute(T t);
}
We will pass the different commands to the parallel stream for execution.
public Order createOrder(int userId, int productId) {
HashMap<Command, Object> executionCommands = new HashMap<>();
Command userCommand = p -> userService.getUser((int) p);
Command productCommand = p -> productService.getProduct((int) p);
executionCommands.put(userCommand, userId);
executionCommands.put(productCommand, productId);
List<Object> result = Stream.of(productCommand, userCommand).parallel().map(k -> k.execute(executionCommands.get(k))).collect(Collectors.toList());
try {
Product product = (Product) result.get(0);
User user = (User) result.get(1);
if (Objects.nonNull(product) && Objects.nonNull(user)) {
return orderService.addOrder(product.getId(), user.getId());
}
} catch (Exception e) {
}
throw new RuntimeException("Failed to process create order");
}
We have created a Map of the commands and their inputs which will be passed to the execute method.
We can pass user Command and product Command to the parallel stream. Successful execution of both the command, ‘createOrder’ will be called.
The same thing can be achieved using CompletableFuture.
public Order createOrder(int userId, int productId) throws InterruptedException {
CompletableFuture<Product> productFuture = CompletableFuture.supplyAsync(() -> productService.getProduct(productId));
CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(() -> userService.getUser(userId));
final CompletableFuture<Order> orderFuture = CompletableFuture.allOf(productFuture, userFuture).thenApplyAsync(u -> {
User user = userFuture.join();
Product product = productFuture.join();
return orderService.addOrder(user.getId(), product.getId());
});
return orderFuture.join();
}
Here instead of the ‘runAsync’ we have called the ‘supplyAsync’. It ensures to receive the result of the computation. The code inside `thenApplyAsync ` will be called on receiving the combined result of product and user CompletableFutures.
Scenario 3: Independent Computation With failures :
Now let’s think of a scenario when we have multiple requests to handle. There is one of the requests if it fails, the complete process will fail. All other requests failure can be ignored.
For example
@Override
public UserDashboard getDashBoard(int userId) {
User user = userService.getUser(userId);
List<Order> orders;
Set<Product> products;
try {
orders = orderService.getOrder(userId);
} catch (Exception e) {
orders = new ArrayList<>();
}
try {
products = productService.getProducts();
} catch (Exception e) {
products = new HashSet<>();
}
UserDashboard userDashboard = new UserDashboard();
userDashboard.setUserOrders(orders);
userDashboard.setUser(user);
userDashboard.setAllProducts(products);
return userDashboard;
}
In the example code, if the call to ‘getuser’ fails the whole execution fails.
but for ‘getOrder’ and ‘getProduct’, we set the empty result if it fails.
If we have to use callable, the Solution could be like this
public UserDashboard getDashBoard(int userId) throws InterruptedException {
Callable<User> userCallable = () -> userService.getUser(userId);
Callable<List<Order>> orderCallable = () -> orderService.getOrder(userId);
Callable<Set<Product>> productCallable = () -> productService.getProducts();
ExecutorService executor = (ExecutorService) Executors.newFixedThreadPool(2);
User user;
Set<Product> products;
List<Order> orders;
try {
user = userCallable.call();
} catch (Exception e) {
throw new RuntimeException("Faild to get Dashboard");
}
ArrayList tasks = new ArrayList<Callable<Object>>();
tasks.add(orderCallable);
tasks.add(productCallable);
final List<Future<Object>> list = executor.invokeAll(tasks);
executor.shutdown();
try {
products = (Set<Product>) list.get(1).get();
} catch (Exception e) {
products = new HashSet<>();
}
try {
orders = (List<Order>) list.get(0).get();
} catch (Exception e) {
orders = new ArrayList<>();
}
final UserDashboard userDashboard = new UserDashboard();
userDashboard.setUser(user);
userDashboard.setUserOrders(orders);
userDashboard.setAllProducts(products);
return userDashboard;
}
we have to explicitly handle the exceptions. So much of a code. No?
The same can be achieved using parallel Stream.
@Override
public UserDashboard getDashBoard(int userId) throws InterruptedException {
HashMap<Command, Object> executionCommands = new HashMap<>();
Command userCommand = p -> userService.getUser((int) p);
executionCommands.put(userCommand, userId);
Command orderCommand = p -> getOrder((int) p);
executionCommands.put(orderCommand, userId);
Command productCommand = p -> getProducts();
executionCommands.put(productCommand, null);
List<Object> result = Stream.of(userCommand, orderCommand, productCommand).parallel().map(k -> k.execute(executionCommands.get(k))).collect(Collectors.toList());
try {
User user = (User) result.get(0);
List<Order> order = (List<Order>) result.get(1);
Set<Product> product = (Set<Product>) result.get(2);
UserDashboard userDashboard = new UserDashboard();
userDashboard.setUser(user);
userDashboard.setUserOrders(order);
userDashboard.setAllProducts(product);
return userDashboard;
} catch (Exception e) {
new RuntimeException("Failed to get Dashboard details");
}
return null;
}
private Set<Product> getProducts() {
try {
return productService.getProducts();
} catch (RuntimeException e) {
}
return new HashSet<>();
}
private List<Order> getOrder(int p) {
try {
return orderService.getOrder(p);
} catch (RuntimeException e) {
}
return new ArrayList<>();
}
Here we have handled the failure in the command itself.
If we have to do the same Using CompletableFuture
public UserDashboard getDashBoard(int userId) throws InterruptedException {
CompletableFuture<User> userFuture = CompletableFuture.supplyAsync(() -> userService.getUser(userId));
CompletableFuture<List<Order>> userOrdersFuture = CompletableFuture.supplyAsync(() -> orderService.getOrder(userId)).exceptionally(e -> new ArrayList<>());
CompletableFuture<Set<Product>> productsFuture = CompletableFuture.supplyAsync(() -> productService.getProducts()).exceptionally(e -> new HashSet<>());
CompletableFuture<UserDashboard> resultFuture = CompletableFuture.allOf(userFuture, userOrdersFuture, productsFuture).thenApplyAsync(f -> {
final UserDashboard userDashboard = new UserDashboard();
userDashboard.setUser(userFuture.join());
userDashboard.setUserOrders(userOrdersFuture.join());
userDashboard.setAllProducts(productsFuture.join());
return userDashboard;
});
return resultFuture.join();
}
If you observe above, we have handled the failure using callback ‘exceptionally’. In case of failure, exceptionally callback is called. This looks a bit elegant as compared to the other solutions.
In all of the examples above you can see different approaches to handle the asynchronous calls. one of these approaches can best fit in the given scenarios.
Here are the points you can keep in mind before you start coding
- ‘invokeAll’ and ‘allOf’ waits for completion of all the tasks whereas parallel stream is `fail fast` that is as soon as one of the async computation fails it will fail. However, you can handle the failure in the functions such as the map.
- For using Callable/Runnable or the CompletableFuture you have the flexibility to choose your own ThreadPool/ExecutorService. So you can control over it, The parallel stream uses the common fork-join pool, You don’t have control to provide your own thread pool. that can lead to disaster for long-running tasks if you don’t have enough threads.
- With CompletableFuture you get the flexibility to chain multiple operations.
- CompletableFuture provides you the nice failure handling. That gives you the ability to write clean code. For doing the same thing you have to write that can be scattered all around otherwise.
If you want to go through the code and fiddle around please check out the git repo.
Also you can watch it on youtube