Reactive services with Vertx3 and Scala

Eclipse Vert.x is an event driven and non blocking, poliglot tool-kit for building reactive applications. I tend to think of it as a simpler and easier akka. By which I mean that it overlaps some of akka’s use-cases, not all of them, but more straightforward - you don’t need to setup an Actor system. The counter point to this is, obviously, that its less powerful in some instances than akka (i.e. you loose actor-supervisor fault tolerance).

Being poliglot, Vertx supports a number of languages such as Java, JavaScript, Groovy, Ruby, and Ceylon. As you can see, Scala is suspiciously missing in the languages Vertx support. Scala was supported on Vertx2, but seems broken on Vertx3 at the time of this writing.

For most common use-cases, however, there are a number of simple ways to combine Vertx and Scala!

EIP to the Recue!

Enterprise Integration Patterns have been around for quite a while now but to me it seems they keep being overlooked by developers. It seems only ESB users are aware of them. You don’t need an overblown ESB to use these helpful little patterns thanks to the Apache Camel library and, of particular interest to us, to the Camel Vertx Component which allows us to send and receive JSON events through the Vertx EventBus.

Communicate with a Vertx Cluster

First we need an instance of the Vertx object so that we communicate with the same cluster and not start a new one. Depending on the particulars of your application, you can obtain one by extending the `AbstractVerticle` interface:

import io.vertx.core.AbstractVerticle
class ScalaVerticle extends AbstractVerticle {
def process(address: String, message: String) =
vertx.eventBus().send(address, message)
}

or you can pass the instance to a Scala companion object from an existing verticle (maybe from a Java8 class):

object MyComponent {
def apply(vertx: Vertx) = ???
}

Now its a matter of starting the camel context and creating the routes (which we leave the implementation for later):

import org.apache.camel.component.vertx.VertxComponent
import io.vertx.core.json.Json
import io.vertx.core.json.JsonObject
import io.vertx.core.json.JsonArray
import org.apache.camel.impl.DefaultCamelContext
import org.apache.camel.CamelContext
import org.apache.camel.ProducerTemplate
import org.apache.camel.scala.dsl.builder.RouteBuilder
import java.util.concurrent.CompletableFuture
import scala.collection.mutable.ListBuffer
object MyComponent {
def apply(vertx: Vertx) = {
val camelContext: CamelContext = new DefaultCamelContext
val template = camelContext.createProducerTemplate()
val vertxComponent = new VertxComponent()
vertxComponent.setVertx(vertx)
camelContext.addComponent("vertx", vertxComponent)
camelContext.addRoutes(routeBuilder)
camelContext.start()
new MyComponent(template)
}
 val routeBuilder = ???
}

Asynchronous Request-Reply

Vertx’s golden rule is — Don’t Block the Event Loop! We will accomplish this using the Request-Reply pattern asynchronously.

categorycodecThis pattern is implemented by setting the ExchangePattern to InOut through the ProducerTemplate we created before. The template has variation for InOnly (fire and forget) and both sync and async messaging:

class MyComponent(template: ProducerTemplate) {
def process(address: String, message: String) =
val future: CompletableFuture[Object] = template.asyncRequestBodyAndHeader("direct:vertxCluster", message, "address", address);
val json: String = template.extractFutureBody(future,    classOf[String])
...
}

Here we send the body messageto the direct:vertxCluster route (to be defined shortly) and we set an Exchange Header called address to the address value we receive as parameter. The asyncRequestBodyAndHeader method returns an implementation of Future which we may retrieve later with the extractFutureBodymethod. Sending the message and receiving the message takes a place behind the hood on a different thread but it’s all transparent to us.

To whom it may concern…

The route is actually very simple, basically it looks like this:

"direct:vertxCluster" --> "vertx:some.address"

If we were to send messages to a single address this would be enough, but most likely we will want to send messages to different addresses within the cluster, which is why we’re setting the address header in the first place. To accomplish this we will use the Recipient List pattern.

Which on the Scala DSL syntax looks like this:

val routeBuilder = new RouteBuilder {
"direct:vertxCluster" recipients(_.getIn.getHeader("address"))
}

Advanced Integration

One of the main issues integrating Scala with Vertx3 is that Vertx3 introduces some functional-style programming which, unfortunately, doesn’t work too well with Scala’s. take for instance the consumer function on io.vertx.core.json.JsonArray which on Java8 looks like this:

...
String json = template.extractFutureBody(future, String.class)
val jsonArray = new JsonArray(json);
List<MyPOJO> pojos = new ArrayList<MyPOJO]>();
jsonArray.forEach { j -> pojos += jsonToPOJO((JsonObject)j) };

Here the reply we get from whatever verticle we end up calling is a String that represents an Array of Json objects. We iterate the array, convert each JsonObject to our own POJO (through some fictional jsonToPojo function) and store the POJO on our pojos list. In particular, -> is not the same as Scala’s equivalent for lambda functions, namely, => so scala’s compiler will not know what to do here.

Fortunately, this has a simple solution. Vertx is asking for a java.util.function.Consumer[T] while Scala’s lambdas have a Funtion1[U,V] type. SinceConsumer[T]'s accept method (the one called here) returns void, all we need to do is convert Scala’s Function1[T,Unit] to Consumer[T] while overriding its accept method:

import java.util.function.Consumer
def toVertxConsumer[T, Unit](f: Function1[T, Unit]): Consumer[T] = new Consumer[T] {
override def accept(t: T) = f(t)
}

Which we can call so:

val pojos: ListBuffer[MyPOJO] = new ListBuffer[MyPOJO]()
jsonArray.forEach { toVertxConsumer {
j => pojos += jsonToPOJO(j.asInstanceOf[JsonObject])
}}
pojos.toList

A final note

Can’t we use the trick above to send messages to Vertx’s EventBus without Camel? Let’s have a look, this is how it looks like on Java8:

vertx.eventBus().send(address, message, options, reply -> {
if (reply.succeeded()) {
logger.info(reply.result());
}
}
);

The function of interes is of type Handler<AsyncResult<Message<T>>> so our transforming function should look like this:

import io.vertx.core.AsyncResultHandler
import io.vertx.core.AsyncResult
import io.vertx.core.Handler
//Edited on 07 Feb 2017
def toVertxHandler[T](f: Function1[AsyncResult[T], Unit]): Handler[AsyncResult[T]] = new AsyncResultHandler[T] {
override def handle(result: AsyncResult[T]): Unit = f(result)
}

And we use it like this:

val pojos: ListBuffer[MyPOJO] = new ListBuffer[MyPOJO]()
vertx.eventBus().send(address, new JsonObject(), new DeliveryOptions(), toVertxHandler {
reply: AsyncResult[Message[T]] => {
if (reply.succeeded()) {
logger.info(reply.result().body())
} // closes if
} // closes reply
} // closes toVertxHandler
) // closes send

This looks alright but I recommend testing it throughly before using it for production ;-)