Reactive services with Vertx3 and Scala

EIP to the Recue!

Communicate with a Vertx Cluster

import io.vertx.core.AbstractVerticleclass ScalaVerticle extends AbstractVerticle {
def process(address: String, message: String) =
vertx.eventBus().send(address, message)
object MyComponent {
def apply(vertx: Vertx) = ???
import org.apache.camel.component.vertx.VertxComponent
import io.vertx.core.json.Json
import io.vertx.core.json.JsonObject
import io.vertx.core.json.JsonArray
import org.apache.camel.impl.DefaultCamelContext
import org.apache.camel.CamelContext
import org.apache.camel.ProducerTemplate
import org.apache.camel.scala.dsl.builder.RouteBuilder
import java.util.concurrent.CompletableFuture
import scala.collection.mutable.ListBuffer
object MyComponent {
def apply(vertx: Vertx) = {
val camelContext: CamelContext = new DefaultCamelContext
val template = camelContext.createProducerTemplate()
val vertxComponent = new VertxComponent()
camelContext.addComponent("vertx", vertxComponent)
new MyComponent(template)
val routeBuilder = ???

Asynchronous Request-Reply

class MyComponent(template: ProducerTemplate) {
def process(address: String, message: String) =
val future: CompletableFuture[Object] = template.asyncRequestBodyAndHeader("direct:vertxCluster", message, "address", address);
val json: String = template.extractFutureBody(future, classOf[String])

To whom it may concern…

"direct:vertxCluster" --> "vertx:some.address"
val routeBuilder = new RouteBuilder {
"direct:vertxCluster" recipients(_.getIn.getHeader("address"))

Advanced Integration

String json = template.extractFutureBody(future, String.class)
val jsonArray = new JsonArray(json);
List<MyPOJO> pojos = new ArrayList<MyPOJO]>();
jsonArray.forEach { j -> pojos += jsonToPOJO((JsonObject)j) };
import java.util.function.Consumerdef toVertxConsumer[T, Unit](f: Function1[T, Unit]): Consumer[T] = new Consumer[T] {
override def accept(t: T) = f(t)
val pojos: ListBuffer[MyPOJO] = new ListBuffer[MyPOJO]()
jsonArray.forEach { toVertxConsumer {
j => pojos += jsonToPOJO(j.asInstanceOf[JsonObject])

A final note

vertx.eventBus().send(address, message, options, reply -> {
if (reply.succeeded()) {;
import io.vertx.core.AsyncResultHandler
import io.vertx.core.AsyncResult
import io.vertx.core.Handler
//Edited on 07 Feb 2017
def toVertxHandler[T](f: Function1[AsyncResult[T], Unit]): Handler[AsyncResult[T]] = new AsyncResultHandler[T] {
override def handle(result: AsyncResult[T]): Unit = f(result)
val pojos: ListBuffer[MyPOJO] = new ListBuffer[MyPOJO]()
vertx.eventBus().send(address, new JsonObject(), new DeliveryOptions(), toVertxHandler {
reply: AsyncResult[Message[T]] => {
if (reply.succeeded()) {
} // closes if
} // closes reply
} // closes toVertxHandler
) // closes send




Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

We Are Polar Squad: Ville

Swashbuckle CLI: Automating ASP.NET Core API Swagger Definitions During Build

Flutter vs Kotlin: Which One to Choose in 2021?

Just In Case. Laravel, create foreign key in migration

DevOps Automated Application Delivery — Part 1

Production Updates — 01/26/2022

Adding A Comment System to Your GitHub Page Using AWS Lambda — Part 1 of 2

The architecture of a AWS Lambda-powered comment system.

Chatbot: Complete Pycharm App

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Eric Rodriguez

Eric Rodriguez

More from Medium

Opaque Type Aliases

Akka Typed with Redis

IntelliJ: An Advanced Moveset for Shared Indexes

Failover and Circuit Breaker with Resilience4j