Java Vert.x Starter Guide — Part 2: Worker Verticles

Levon Tamrazov
5 min readFeb 3, 2017

--

This is the second post in a 3 part series on Vert.x in Java. Vert.x is a reactive framework for developing asynchronous applications. For a more thorough introduction checkout Part 1 of this series. This post will expand on our HelloWorld application by adding worker verticles and introducing the Vert.x event bus.

This repo contains the code we wrote so far, feel free to clone it to follow along. You can also find the full code for Part 2 here.

So Far…

In the first part of this series, we deployed our service on a single server verticle. This was an event loop verticle that did all of its processing using executeBlocking. Behind the scenes, this method launches a worker thread to execute the code you give it. This thread is grabbed from a worker pool that every event loop verticle has. We can configure this pool when we deploy the verticle by using setWorkerPoolSize option in DeploymentOptions.

This technique allows us to nicely process our requests asynchronously. However, it couples our business logic with our server and limits our scaling and deployment options. Enter worker verticles.

Hello Worker

Workers are another type of verticle in Vert.x that are designed to execute blocking code. Unlike the regular verticles that run on the event loop thread, workers run on worker threads from a preconfigured pool. Because they are not part of the event loop, they can block while doing their processing. They can communicate with other verticles using the Vert.x event bus.

Now the cool part is since they are verticles, we can deploy them completely independently from other verticles. In fact we can deploy more instances written in a completely different language running on a different, beefier machine. It encapsulates all our business logic and completely segregates it from our server.

Event Bus Producer

First we add a new end point in our router that will send a message over the event bus to whoever might be listening. The event bus has two ways of communication:

  1. send — This is two way communication that allows us to set a reply handler that will process the response from our consumer.
  2. publish — This is a send-and-forget type of communication that can be used when you don’t care about a response. You are simply publishing an event that you need other processes to be aware of.

Our new handler looks like this:

.../web/HelloController.javaprivate void getGreetingW(RoutingContext ctx){
String data = new JsonObject()
.put("name", ctx.request().getParam("name"))
.encode();
vertx.eventbus().send(Events.GREET, data, res -> {
handleEventBusResponse(res, ctx);
});
}

All messages are sent as buffers, and by default Vert.x allows sending String and Buffer types. For any other type, Vert.x looks up a MessageCodec that specifies serialization into / from a buffer:

vertx.eventBus().send(event, SomeObject, handler)

The code above, would require us creating a MessageCodecfor SomeObject. Since Vert.x is multi-tenant in terms of languages, it is encourages to use JSON as your message format since it can be easily parsed by any language.

We created another response handler to process our event bus responses and modified our global error handler to catch ReplyException which is the exception that is produced from our message consumers when they fail the response (see below). That code is self explanatory so is omitted from here, but I encourage you to check it out in the repo.

Setting Up the Worker Verticle

Worker is structured just like any other verticle, except it will use the event bus consumer to listen for messages and reply to them. Here is what the start method looks like:

.../service/HelloWorker.java@Override
public void start(Future<Void> done){
MessageConsumer<String> consumer =
vertx.eventBus().consumer(Events.GREET);

consumer.handler(m -> {
JsonObject data = new JsonObject(m.body());
Greeting retval = service.greet(data.getString("name"));
try{
m.reply(Json.encode(retval));
}catch (EncodeException e){
m.fail(HttpResponseStatus.INTERNAL_SERVER_ERROR.code(),
"Failed to encode data.");
}
});

done.complete();
}

This is a verticle, so it must extend the AbstractVerticle and override the start method. The interesting part is in the event bus:

MessageConsumer<String> consumer = 
vertx.eventBus().consumer(Events.GREET);

This code creates a consumer on the event bus that listens on the Greet event. The type of consumer specifies that we are expecting a String message. The next part, sets a handler for this event:

consumer.handler(m -> { ... });

The body of the message can be accessed via m.body(). This will be of the type specified for this consumer. Here you can do any blocking processing, and once you are done, call m.reply() to send the response back over the event bus. Should it fail, you can use m.fail() to indicate an error with a specified code and message. Because the error occurs in another verticle, it is important to catch all exceptions and fail over to make sure our server gets a reply.

Deploying Our Verticle

Finally we modify our ServiceLauncher to deploy all verticles:

.../ServiceLauncher.java:@Override
public void start(Future<Void> done){
int WORKER_POOL_SIZE = 20;

DeploymentOptions serverOpts = new DeploymentOptions()
.setWorkerPoolSize(WORKER_POOL_SIZE);

DeploymentOptions workerOpts = new DeploymentOptions()
.setWorker(true)
.setInstances(WORKER_POOL_SIZE)
.setWorkerPoolSize(WORKER_POOL_SIZE);

CompositeFuture.all(
deploy(ServerVerticle.class.getName(), serverOpts),
deploy(HelloWorker.class.getName(), workerOpts)
).setHandler(r -> {
if(r.succeeded()){
done.complete();
}
else {
done.fail(r.cause());
}
});
}

A few things to note here:

  1. We use setWorker(true) in the workerOpts to indicate that this verticle should be deployed as a worker. By default worker verticle instance will only be executed by one thread at any one time (it is not concurrent). This means that you will not be processing more requests that the number of verticle instances no matter how large your thread pool is, so make sure those are equal. Alternatively you can use setMultiThreaded(true) to make this a multi threaded worker. This will allow several threads to run on the same instance concurrently, but then make sure your workers are thread safe.
  2. We use CompositeFutureto deploy our verticles at once. This will insure that our deployment succeeds only if all verticles get deployed successfully.
  3. deploy is a helper method that simply wraps the vertx.deployVerticle method call. You can see the implementation here.

Conclusion

We now have a worker verticle that can be used to offload any heavy processing from our server. In our case we deploy it together with our main verticle, but it can be deployed completely independently on another machine, with a different number of instances, and even a different language.

Thats it for this post. Stay tuned for Part 3 that will talk about using Guice to setup dependency injection in our code.

--

--

Levon Tamrazov

Software enthusiast, going through life on an event loop.