Combining smallrye mutiny and CompletionStage approach in quarkus

Alexei Rubinov
4 min readJun 15, 2020

Last half of the year we are working on the transformation of the old monolithic project to the bunch of micro-services. One of the challenges of this project (and rather expected challenge for all transformation projects) is that you don’t have a chance to transform project via big-bang. Instead you should build-up transformation strategy to replace part of application step-by-step, where the new parts are coexisting with the old ones.

We have selected quarkus framework as the framework for our target micro-services. Combined with creation of images with native applications via GraalVM, this has helped us to reduce resource consumption in the managed kubernetes cluster, where our bunch of micro-services is hosted and start-up time.

One of the recent tasks was a creation of rather simple backend-for-frontend, which should give to the front-end statistic information about trouble tickets for given customer in different statuses via REST-interface. This back-end in turn should communicate with old system (also via REST interface) to gather this statistics, but the interface for communication here is really not suited to our needs: we want back amount of the tickets in a set of statuses, but the via interface only ids of the tickets in given status could be given back. From another side this interface allows to execute concurrent requests, so we have decided to give a try to the brand new mutiny framework to perform the task.

Our first thought was rather straightforward. Let’s take quarkus resteasy mutiny tutorial and the task is completed. The implementation in our case appeared to be not so easy.

The implementation of REST client was very straight-forward, exactly as described in tutorials:

@Path("/url-to-old-system")
@RegisterRestClient(configKey="old-system-api")
public interface OldSystemRestClient {


@GET
@Path("/troubleTickets")
@Produces("application/json")
List<String> getTicketIdsByCustomerIdAndStatus(@QueryParam("customerId") String customerId,@QueryParam("ticketStatus") String ticketStatus);
}

Implementation of service layer was a little bit more complicated task, but also has been accomplished using snippet below:

public Uni<StatisticResponse> getStatisticByCustomerIdAndStatus(String customerId, String statuses) {

...
// Run for each status one request with given customerId to trouble ticket interface. Assemble result
// into amount and errorSet variables.
AtomicInteger amount = new AtomicInteger();
ConcurrentHashSet<Throwable> errorSet = new ConcurrentHashSet<>();
Multi.createFrom().items(statusSet.parallelStream())
.onItem().apply(r -> client.getTroubleTicketIds(customerId, r))
.subscribe().with(item -> amount.addAndGet(item.size()),
failure -> {
errorSet.add(failure);
});

if (!errorSet.isEmpty()) {
errorSet.parallelStream()
.forEach(Throwable::printStackTrace);
wrapper.setException(new WebApplicationException("Something is wrong!"));
}
if (!errorSet.isEmpty()) {
errorSet.parallelStream()
.forEach(Throwable::printStackTrace);
wrapper.setException(new WebApplicationException("Something is wrong!"));
}
StatisticResponse response = new StatisticResponse();
response.setTicketAmount(amount.intValue());
wrapper.setResponse(response);
return Uni.createFrom().item(wrapper);
}

The implementation of facade from the first look was also rather straight-forward. As a result we should return Uni<Response> created from Service method “getStatisticByCustomerIdAndStatus”.

public Uni<Response> getStatisticByCustomerIdAndStatus(@QueryParam("customerId") String customerId, @QueryParam(
"statuses") String statuses) {

Response.ResponseBuilder builder = Response.ok();

Uni.createFrom().item((Service.getStatisticByCustomerIdAndStatus(customerId,
statuses))).
subscribe().with(item -> {
StatisticResponseWrapper wrapper = item.await()
.indefinitely();
if (wrapper.getException() != null)
System.out.println(
"Failed with "
+ wrapper.getException()
.getMessage());
else {
builder.entity(wrapper.getResponse());
}
},
failure -> System.out.println("Failed with " + failure.getMessage()));

return Uni.createFrom().item(builder.build());
}

But there are several problems with the approach listed above:

  1. Usage of the response wrapper to properly communicate errors arising in service and client.
  2. As it turned out later, this approach (namely usage of Uni in the facade) is not compatible with micro-profile fault-tolerance.

While it was pretty simple to add retries to the REST client

@Retry(maxRetries = 3, delay = 50)
@GET
@Path("/troubleTicketIds")
@Produces("application/json")
List<String> getTicketIdsByCustomerIdAndStatus(@QueryParam("customerId") String customerId,@QueryParam("ticketStatus") String ticketStatus);

it was not possible accomplish the task regarding creation of the CircuitBreaker for the facade, preserving Uni<Response> as return type of the method. The problem, which we have encountered here was, that fault tolerance annotations are not working with mutiny Uni and Multi types. The solution was to convert our facade method to return CompletionStage<Response> instead of Uni. This approach has allowed us to use CircuitBreaker and Timeout micro-profile annotations on facade method:

@Timeout (25000)
@Asynchronous
@CircuitBreaker (requestVolumeThreshold = 10,failureRatio = 0.5,delay=10000)
@GET
@Path ("/statistic")
public CompletionStage<Response> getStatisticByCustomerIdAndStatus(@QueryParam ("customerId") String customerId,
@QueryParam ("statuses") String statuses) {

CompletableFuture<Response> response = null;
try {
response = service.getStatisticByCustomerIdAndStatus(customerId, statuses)
.subscribeAsCompletionStage()
.whenComplete((wrapper, failure) -> {
if (failure != null) {
LOGGER.errorf(ERROR_DURING_EXECUTION_OF_MVBACKEND_SERVICE_S, failure.getMessage());
throw new WebApplicationException(failure.getMessage());
}
})
.thenApply(wrapper -> CompletableFuture.completedFuture(builder.status(Response.Status.OK)
.entity(wrapper)
.build()))
.get();
} catch (ServiceException e) {
LOGGER.errorf(ERROR_DURING_EXECUTION_OF_MVBACKEND_SERVICE_S, e.getMessage());
throw new WebApplicationException(e.getMessage(), e.getStatus());
} catch (InterruptedException e ) {
LOGGER.errorf(ERROR_DURING_EXECUTION_OF_MVBACKEND_SERVICE_S, e.getMessage());
Thread.currentThread().interrupt();
throw new WebApplicationException(e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
} catch (ExecutionException e) {
LOGGER.errorf(ERROR_DURING_EXECUTION_OF_MVBACKEND_SERVICE_S, e.getMessage());
throw new WebApplicationException(e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
}

return response;
}

Also we have performed corresponding transformation on Service method to throw an exception, when something was wrong in the service layer or there was an error from client:

public Uni<StatisticResponse> getStatisticByCustomerIdAndStatus(String customerId, String statuses)  throws
ServiceException {

...
// Run for each status one request with given customerId to trouble ticket interface. Assemble result
// into amount and errorSet variables.
AtomicInteger amount = new AtomicInteger();
ConcurrentHashSet<Throwable> errorSet = new ConcurrentHashSet<>();
Multi.createFrom().items(statusSet.parallelStream())
.onItem().apply(status -> client.getTicketIdsByCustomerIdAndStatus(customerId, status))
.subscribe().with(item -> amount.addAndGet(item.size()), errorSet::add);

if (!errorSet.isEmpty()) {
errorSet.parallelStream()
.forEach(throwable ->
LOGGER.errorf(ERROR_FROM_TROUBLE_TICKET_INTERFACE_S,throwable.getMessage()));
throw new ServiceException("Error(s) from trouble ticket interface. See "
+ "trouble ticket backend-for-front-end log for "
+ "details", Response.Status.INTERNAL_SERVER_ERROR.getStatusCode());
}

StatisticResponse response = new StatisticResponse();
response.setTicketAmount(amount.intValue());
return Uni.createFrom().item(response);
}

Combination of java concurrency classes and mutiny framework in the beginning was considered by us, as some exotic construction, which might be used in rather rare cases. Instead, combination of java concurrency classes and mutiny classes has allowed us to combine power and simplicity of reactive mutiny framework with simplicity of eclipse micro-profile fault tolerance annotations.

--

--