Reactive services with Vertx3 and Scala
Eclipse Vert.x is an event driven and non blocking, poliglot tool-kit for building reactive applications. I tend to think of it as a simpler and easier akka. By which I mean that it overlaps some of akka’s use-cases, not all of them, but more straightforward - you don’t need to setup an Actor system. The counter point to this is, obviously, that its less powerful in some instances than akka (i.e. you loose actor-supervisor fault tolerance).
Being poliglot, Vertx supports a number of languages such as Java, JavaScript, Groovy, Ruby, and Ceylon. As you can see, Scala is suspiciously missing in the languages Vertx support. Scala was supported on Vertx2, but seems broken on Vertx3 at the time of this writing.
For most common use-cases, however, there are a number of simple ways to combine Vertx and Scala!
EIP to the Recue!
Enterprise Integration Patterns have been around for quite a while now but to me it seems they keep being overlooked by developers. It seems only ESB users are aware of them. You don’t need an overblown ESB to use these helpful little patterns thanks to the Apache Camel library and, of particular interest to us, to the Camel Vertx Component which allows us to send and receive JSON events through the Vertx EventBus.
Communicate with a Vertx Cluster
First we need an instance of the Vertx object so that we communicate with the same cluster and not start a new one. Depending on the particulars of your application, you can obtain one by extending the `AbstractVerticle` interface:
import io.vertx.core.AbstractVerticleclass ScalaVerticle extends AbstractVerticle {
def process(address: String, message: String) =
vertx.eventBus().send(address, message)
}
or you can pass the instance to a Scala companion object from an existing verticle (maybe from a Java8 class):
object MyComponent {
def apply(vertx: Vertx) = ???
}
Now its a matter of starting the camel context and creating the routes (which we leave the implementation for later):
import org.apache.camel.component.vertx.VertxComponent
import io.vertx.core.json.Json
import io.vertx.core.json.JsonObject
import io.vertx.core.json.JsonArrayimport org.apache.camel.impl.DefaultCamelContext
import org.apache.camel.CamelContext
import org.apache.camel.ProducerTemplate
import org.apache.camel.scala.dsl.builder.RouteBuilderimport java.util.concurrent.CompletableFuture
import scala.collection.mutable.ListBufferobject MyComponent {
def apply(vertx: Vertx) = {
val camelContext: CamelContext = new DefaultCamelContext
val template = camelContext.createProducerTemplate()
val vertxComponent = new VertxComponent()
vertxComponent.setVertx(vertx)
camelContext.addComponent("vertx", vertxComponent)
camelContext.addRoutes(routeBuilder)
camelContext.start()
new MyComponent(template)
} val routeBuilder = ???
}
Asynchronous Request-Reply
Vertx’s golden rule is — Don’t Block the Event Loop! We will accomplish this using the Request-Reply pattern asynchronously.
categorycodecThis pattern is implemented by setting the ExchangePattern to InOut through the ProducerTemplate we created before. The template has variation for InOnly (fire and forget) and both sync and async messaging:
class MyComponent(template: ProducerTemplate) {
def process(address: String, message: String) =
val future: CompletableFuture[Object] = template.asyncRequestBodyAndHeader("direct:vertxCluster", message, "address", address);val json: String = template.extractFutureBody(future, classOf[String])
...
}
Here we send the body message
to the direct:vertxCluster
route (to be defined shortly) and we set an Exchange Header called address
to the address
value we receive as parameter. The asyncRequestBodyAndHeader
method returns an implementation of Future
which we may retrieve later with the extractFutureBody
method. Sending the message and receiving the message takes a place behind the hood on a different thread but it’s all transparent to us.
To whom it may concern…
The route is actually very simple, basically it looks like this:
"direct:vertxCluster" --> "vertx:some.address"
If we were to send messages to a single address this would be enough, but most likely we will want to send messages to different addresses within the cluster, which is why we’re setting the address
header in the first place. To accomplish this we will use the Recipient List pattern.
Which on the Scala DSL syntax looks like this:
val routeBuilder = new RouteBuilder {
"direct:vertxCluster" recipients(_.getIn.getHeader("address"))
}
Advanced Integration
One of the main issues integrating Scala with Vertx3 is that Vertx3 introduces some functional-style programming which, unfortunately, doesn’t work too well with Scala’s. take for instance the consumer function on io.vertx.core.json.JsonArray
which on Java8 looks like this:
...
String json = template.extractFutureBody(future, String.class)
val jsonArray = new JsonArray(json);
List<MyPOJO> pojos = new ArrayList<MyPOJO]>();
jsonArray.forEach { j -> pojos += jsonToPOJO((JsonObject)j) };
Here the reply we get from whatever verticle we end up calling is a String that represents an Array of Json objects. We iterate the array, convert each JsonObject to our own POJO (through some fictional jsonToPojo
function) and store the POJO on our pojos
list. In particular, ->
is not the same as Scala’s equivalent for lambda functions, namely, =>
so scala’s compiler will not know what to do here.
Fortunately, this has a simple solution. Vertx is asking for a java.util.function.Consumer[T]
while Scala’s lambdas have a Funtion1[U,V]
type. SinceConsumer[T]
's accept
method (the one called here) returns void, all we need to do is convert Scala’s Function1[T,Unit]
to Consumer[T]
while overriding its accept method:
import java.util.function.Consumerdef toVertxConsumer[T, Unit](f: Function1[T, Unit]): Consumer[T] = new Consumer[T] {
override def accept(t: T) = f(t)
}
Which we can call so:
val pojos: ListBuffer[MyPOJO] = new ListBuffer[MyPOJO]()
jsonArray.forEach { toVertxConsumer {
j => pojos += jsonToPOJO(j.asInstanceOf[JsonObject])
}}
pojos.toList
A final note
Can’t we use the trick above to send messages to Vertx’s EventBus without Camel? Let’s have a look, this is how it looks like on Java8:
vertx.eventBus().send(address, message, options, reply -> {
if (reply.succeeded()) {
logger.info(reply.result());
}
});
The function of interes is of type Handler<AsyncResult<Message<T>>>
so our transforming function should look like this:
import io.vertx.core.AsyncResultHandler
import io.vertx.core.AsyncResult
import io.vertx.core.Handler //Edited on 07 Feb 2017
def toVertxHandler[T](f: Function1[AsyncResult[T], Unit]): Handler[AsyncResult[T]] = new AsyncResultHandler[T] {
override def handle(result: AsyncResult[T]): Unit = f(result)
}
And we use it like this:
val pojos: ListBuffer[MyPOJO] = new ListBuffer[MyPOJO]()
vertx.eventBus().send(address, new JsonObject(), new DeliveryOptions(), toVertxHandler {
reply: AsyncResult[Message[T]] => {
if (reply.succeeded()) {
logger.info(reply.result().body())
} // closes if
} // closes reply
} // closes toVertxHandler
) // closes send
This looks alright but I recommend testing it throughly before using it for production ;-)