Remote RPC based on messaging

Alberto Rossotto
Geek Culture
Published in
7 min readAug 18, 2021

Two solutions for remote calls performed over messaging systems.

Photo by John Barkiple on Unsplash

Connecting two systems via API is a straightforward solution but it’s not always applicable. Sometimes services are connected by an Enterprise Service Bus or by another messaging system (e.g. ActiveMQ, RabbitMQ, JMS…). In this configuration it is harder to implement a synchronous request-response logic. Here there are two solutions, one generic that can be easily adapted to different technologies, and one specific for RabbitMQ.

First solution, “manual” implementation

Doing the job “manually” without using a framework is not a difficult task. It is important to understand the problem and decompose it in simple parts.

There are three different capabilities that needs to be implemented:

  • being able to receive replies from the remote service: this is a service that constantly listens to replies and it is responsible for their storage.
  • being able to send requests to the remote service: this is a fire and forget functionality that completes and returns a future.
  • being able to correlate requests with replies: this is the functionality that completes the future using the data in the storage.
Diagram of the three flows

To demonstrate the flow I will describe both the client and the server. For the example I will use Java, Spring Boot, RabbitMQ, and Docker. For the full code please check the links at the end.

The flow:

  • the Client sends a request to the request-queue on RabbitMQ.
  • the Server processes the request and replies on the response-queue.
  • the Client receives the reply and proceeds.

The client will be a simple webservice with a REST interface to accept parameters. This will be useful to test the solution. This is modeled in the following diagram:

Overall flow

I will focus on the client because it is where most of the logic must be implemented. The structure is in the following diagram:

  • RestService: just a user interface to pass inputs.
  • BackendInvocationService: the core component with the logic to invoke the backend via RabbitMQ.
  • MessageRepository: the storage that keeps all the replies.
  • Receiver: the service that waits for replies from RabbitMQ and stores them in the MessageRepository.
  • RabbitTemplate: the spring bean to send messages.

Setup of RabbitMQ

For this example it is sufficient to use the default RabbitMQ on Docker.

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9-management

The GUI is available at http://localhost:15672/ (credentials guest/guest)

I will explain briefly how to setup the client to connect to RabbitMQ. Please refer to the official documentation for details and check the complete source code in the Links for the backend side.

@Bean
DirectExchange exchange() {
return new DirectExchange(responseExchangeName);
}
@Bean
Queue queue() {
return new Queue(responseQueueName, false);
}

@Bean
Binding binding(Queue queue, DirectExchange exchange) {
return
BindingBuilder
.bind(queue)
.to(exchange)
.with(responseManualRoutingKey);
}
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
@Bean
SimpleMessageListenerContainer container(
ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(responseQueueName);
container.setMessageListener(listenerAdapter);
return container;
}

RabbitMQ works with Exchanges that are the recipients of the messages, it exposes Queues, and it binds the two together with Bindings that are characterized by routing keys (sort of selectors). The code above does exactly that.

Client: RestService

This is a simple @RestController with an entry point for a GET. The service will simply calculate the cube of the input (it is just an example!) on /cube.

@RestController
public class RestService {
private BackendInvocationService backendInvocation;

public RestService(
BackendInvocationService backendInvocationService) {
this.backendInvocation = backendInvocationService;
}

@GetMapping("/cube")
public Response cube(
@RequestParam(value = "value", defaultValue = "1") int value) {
return backendInvocation.manuallyInvokeCube(
new Request(UUID.randomUUID().toString(), value));
}
}

Besides the value, the Request also contains a UUID that identifies the request. This UUID is very important because it will be used as correlationId to match a Request with the corresponding Result. Any type of id works, as long as it is unique.

Client: BackendInvocationService

This is half of the core solution. The method manuallyInvokeCube sends the request to the server using RabbitMQ and then gets a Future from the MessageRepository using the correlationId of the Request. MessageRepository doesn’t know the Response yet and the Future is a sort of placeholder that will be filled later asynchronously.

@Component
public class BackendInvocationService {

private RabbitTemplate rabbitTemplate;
private MessageRepository messageRepository;

public BackendInvocationService(
RabbitTemplate rabbitTemplate,
MessageRepository messageRepository) {
this.rabbitTemplate = rabbitTemplate;
this.messageRepository = messageRepository;
}

public Response manuallyInvokeCube(Request request) {
rabbitTemplate.convertAndSend(
requestExchangeName, requestManualRoutingKey, request);
Future<Response> futureResult =
messageRepository.getFuture(request.getCorrelationId());
try {
return futureResult.get(120, TimeUnit.SECONDS);
}catch (InterruptedException | ExecutionException e) {
return Response.PROCESSING_ERROR;
} catch (TimeoutException e) {
Thread.currentThread().interrupt();
return Response.PROCESSING_ERROR;
}
}
}

In the example the Future is immediately checked with a blocking get() to return the value. In a real service it is advisable to postpone the get() as much as possible to take advantage of the asynchronous execution.

Client: Receiver

This is a simple receiver that gets messages from the queue and stores them in the MessageRepository. When this happens, Message repository becomes able to complete the Future the returned in the previous step.

@Component
public class Receiver {
private MessageRepository messageRepository;

public Receiver(MessageRepository messageRepository) {
this.messageRepository = messageRepository;
}

public void receiveMessage(Response response) {
messageRepository.store(response);
}
}

Client: MessageRepository

This is the component that ties things together. It wraps a Map that holds the Futures indexed by correlationId (the uuid). The Futures become complete when the data is stored in the map and the containsKey(uuid) becomes true. It is important to remember that these operations happen concurrently therefore synchronization is a factor to consider.

@Component
public class MessageRepository {
private Map<String, Response> repo = new ConcurrentHashMap<>();

public Future<Response> getFuture(String uuid) {
return CompletableFuture.supplyAsync(() -> {
while (!repo.containsKey(uuid)) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
return repo.remove(uuid);
});
}

public void store(Response response) {
repo.put(response.getCorrelationId(), response);
}
}

Please note that this implementation is a simplification. In particular it is fundamental to add mechanism to cleanup old Responses corresponding to transactions that had a timeout.

Server: Receiver

A brief look to the backend. It creates a Response marked by the Request’s correlationId and calculates the cube.

@Component
public class Receiver {

private RabbitTemplate rabbitTemplate;

private SecureRandom random = new SecureRandom();

public Receiver(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}

public void receiveMessageManual(Request request) {
Response response = new Response(
request.getCorrelationId(), cube(request));
rabbitTemplate.convertAndSend(
responseExchangeName, responseManualRoutingKey, response);
}

private int cube(Request request) {
return
request.getValue() * request.getValue() * request.getValue();
}
}

There is not much logic here. As long as the Response is marked with the Request’s correlationId, the server does not have particular concept to implement.

Test

To verify that everything works it is sufficient to create a few threads calling the REST service of the client in parallel with different inputs.

WebClient client = 
WebClient.builder()
.baseUrl("http://localhost:8080")
.defaultHeader(
HttpHeaders.CONTENT_TYPE,
MediaType.APPLICATION_JSON_VALUE)
.defaultUriVariables(
Collections.singletonMap(
"url", "http://localhost:8080")).build();
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 10; i++) {
int value = i;
threads.add(new Thread(() -> {
String result = client.get().uri(
builder -> builder.path("/cube")
.queryParam("value", value)
.build())
.retrieve()
.bodyToMono(String.class)
.block();
LOG.info("result for {} was {}", value, result);
}));
}
threads.forEach(t -> t.start());
threads.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});

This example will request the cubes of 0 to 9 and the output will be look as the following with the replies in random order.

[T7] result for 7 was {"correlationId":"7ca1bb96-7adc-4efc-9d39-17605a283009","value":343,"error":null}
[T2] result for 2 was {"correlationId":"1572b2bf-73d0-4fd2-9ab0-439fd181e7bd","value":8,"error":null}
[T4] result for 4 was {"correlationId":"0f9e55c7-68eb-459b-b5f9-eb55a18e3e83","value":64,"error":null}
[T6] result for 6 was {"correlationId":"056312ee-32ae-44d7-99af-2d2350329755","value":216,"error":null}
[T9] result for 9 was {"correlationId":"ff026711-68d3-45cb-be0b-6aea256a736f","value":729,"error":null}
[T1] result for 1 was {"correlationId":"6e0f050d-c6b6-498d-bf65-b9073f503ac1","value":1,"error":null}
[T3] result for 3 was {"correlationId":"a4823e86-fa55-4838-89fb-3356f12d7b90","value":27,"error":null}
[T0] result for 0 was {"correlationId":"4b57857f-e447-4a6f-82f3-50bc964ece0e","value":0,"error":null}
[T5] result for 5 was {"correlationId":"54a60575-4744-4e90-b2c0-5d8dd130ea31","value":125,"error":null}
[T8] result for 8 was {"correlationId":"50c20f17-182e-428d-b559-8b93abef9617","value":512,"error":null}

Second solution: use RabbitMQ RPC

The above implementation can be greatly simplified using RabbitMQ, because RabbitMQ includes a mechanism to perform automatically the Request-Response correlation. This is the difference:

Client: RestController

@GetMapping("/cube-rpc")
public Response cubeRpc(
@RequestParam(value = "value", defaultValue = "1") int value) {
return backendInvocation.rpcInvokeSquare(
new Request(null, value));
}

This is a replacement for the original entrypoint and it works in the same way. I used a different mapping to keep both in the code. Please note that there no need for a correlationId anymore, therefore it is left empty.

Client: BackendInvocationService

public Response rpcInvokeSquare(Request request) {
return (Response)
rabbitTemplate.convertSendAndReceive(
requestExchangeName, requestRpcRoutingKey, request);
}

The original call convertAndSend is replaced by convertSendAndReceive and this api hides all the complexity. There’s no more need for a MessageRepository or a Receiver in the client.

Backend: Receiver

public Response receiveMessageRpc(Request request) {
return new Response(null, cube(request));
}

The backend implementation is also simplified. Instead of sending a message to the response queue, it is sufficient to return the result in the method.

Conclusion

It is not very difficult to manually implement RPC using Messaging, but the usage of the native RPC functions of RabbitMQ can save lots of time and complexity in the code.

The very simple design with RabbitMQ RPC

Links

--

--