Asynchronous fun with vert.x and cyclops-react

It’s possible and in fact pretty straightforward to integrate cyclops-react and vert.x. Which means you can have cyclops-react future based Streams, execute all tasks on vert.x event loops. Let’s start with a very simple example and have vert.x and cyclops-react add some numbers together.

A simple example to start

In the above gist we configure a cyclops-react LazyReact instance to execute any tasks via vert.x. We can use the LazyReact instance to launch future-backed Streams (LazyFutureStream), and all of the associated future-based tasks will be executed via the vert.x event loop.

Let’s try that out

Asynchronous non-blocking I/O

No we have a very simple Stream that will execute it’s future based tasks via vert.x. That is the map operation will be applied asynchronously (reduce will run on the current thread). To make the whole thing run asynchronously via vert.x we should either make use of the run() terminal operation in LazyFutureStream or the futureOperations operator, which makes all terminal operations asynchronous.

Let’s see how that works

In this gist (above) we are executing a sequential stream type, called ReactiveSeq, asynchronously via vert.x. ReactiveSeq does not utilise futures to execute tasks, and is in fact an extended JDK 8 Stream. In this example we are populating a (special) cyclops-react async Queue with some URLs we would like to download. This is all happening asynchronously on the vert.x event loop. We inject vert.x into cyclops-react by passing it to the futureOperations operator.

Let’s use our future-back Stream type to do the downloading.

Now we are injecting vert.x both into our LazyReact future Stream builder and our ReactiveSeq instance. Everything is occuring asyncrhonously via the vert.x event loop.

When run our ReactiveSeq instance will push URLs onto the downloadQueue. Our LazyReact stream will recieve new URLs to download from the downloadQueue, and fetch them. Once downloaded it pushes them onto the completedQueue.

Let’s add some more code to capture the completed results.

Now we have another Stream, this time being executed on the current thread that prints out the downloaded web content.

Now let’s build a simple server

Let’s build a simple asynchronous server that groups incoming requests and performs mathematical functions on them. We can start by configuring vert.x to act as a WebServer. We can use a CompletableFuture instance as bridge into SimpleReact to alert on the servers status.

If we configure an async cyclops-react Queue we can use it to allow different user requests to interact. First up setting up the queue and passing in user requests.

Now that we are capturing user requests in an async.Queue we can create a Future-based Stream from this queue, and have it execute it’s tasks on the vert.x event loop. We can use the grouped operator to group every pair of requests, and generate a Tuple of four elements to capture the response object for each request and the parameters the users have sent in. For one user we will add each pair of numbers, and for the other we will multiply them.

The video below shows the execution of our code and the asynchronous calculation of two user requests.