Reactive NIO (Non Blocking Input/Output)

Plugging and playing with AOL Cyclops in a Springboot application. ♻️

--

Ground Breakers: SpringBoot, Jersey, Cyclops, Postman, Karate, Gradle

🌛 Prerequisites: Understanding of basic Apache NIO Engine and Asynchronous computation using Java 8 Futures.

🌜Why NIO and When do we need it?

NIO is useful in the world of Micro ➕ Services❗️ Above diagram can illustrate where it can fit in general.

As an example, Kayak can explain a business use-case.

(Blocked Input/Output) BIO: When a user submits a request to search for a flight, Kayak has to handle communication with 10(assume) other airlines and fetch their prices. So, doing some simple math, when the request is received by the server a thread will pick it up to process and spin off another 10 threads in order to communicate with 10 airlines, resulting in a total of 1+10 = 11 threads. Nice! So, one request results 11 threads and 10,000 requests results in 11*10,000 = 110,000. This will eventually cause the system to run out of threads as more and more requests are in place. That’s all. So how do we efficiently handle this case? Solution is NIO.

(Non Blocking Input/Output) NIO: Instead of having a thread hanging, waiting on responses from a specific airline system, utilize the same for another task to run and react to the responses as they come along. That means the same thread is doing a another job like sending another request instead of waiting for a response from the initial request.

BIO 🚸NIO:

BIO is faster than NIO, up to few thousand requests. But it will not scale after that. You can’t handle millions of requests using BIO but NIO can. On the other aspect, the time needed to achieve NIO is much less. Thinking how? Let’s look at the example. As the functions were predefined to handle NIO, you can avoid many pitfalls by using AOL Cyclops NIO, which is the focal point of this article. The extensive and predictive collection of operators and APIs are coming out of the box to handle the map reduces and fail over scenarios. Enough description. Time to get hands on.

🚶 Example:

A working example on github

Make sure the below Cyclops dependencies are in place:

Successful start of CyclopsReactApplication

In the favor of TDD(Test Driven Development): Let’s get inside with a Karate test.

Each test case will kick off one API call either with POST or Get— that subsequently kicks off multiple other using NIO.

Running one of the above test case out of 6 defined results the below log:

Handling multiple subsequent requests and responses from a single individual thread.

A single thread named *-8080-exec-5 in the above log that received a request will now start sending multiple Async & Sync calls using Spring Rest Templates. The response is handled in a reactive fashion by the threads named *-worker-0. In this case response handler is not really doing any job and so the same thread is used by NIO to handle all the incoming responses asynchronously. In case of sync calls the same thread *-8080-exec-5 which is a parent one is sending and looking for the response and since we intentionally delayed the response for 3 secs you can observe *Pool.commonPool-worker-* threads are now brought into action by NIO to react for the responses to handle. This is beautiful as you no longer have to worry about handling the reactive nature of the responses.

The below code snippet is explains to create 10 (GET) + 10 (POST) = 20 HTTP calls to get ready when a request received subsequently. Observe the delay period to test the responses handled by different threads who might not be the request senders.

?delay=3 → leads the response to come after 3 secs.

SimpleReact, Sync & Async Communication:

Below snippet explains how we make the API calls to the above constructed APIs using SimpleReact Component. Based on the requirement you can either create a single instance of this class and use it or create the object on every request to raise the requests. If you are dealing with many subsequent calls from a parent one which can have lengthy delays and not interested in configuring your own external pool to test with, go with a new instance. Below one uses a single instance of SimpleReact across the application.

The FutureConverter is a dependency object from future-converter-spring-java8 that can help to convert Spring Future to JDK Future object as Cyclops completely depends on JDK Future objects.

SimpleReactor uses the standard ThreadPool for Standard Parallel Executor using the ForkJoin common pool. You can also inject the custom ThreadPool Executor using its overloaded constructors as below. In Many cases, we can go with standard.

When async is false → Subsequent tasks are executed on the completing thread, otherwise, each task is resubmitted to a separate task executor.

from → will start a reactive buffer dataflow from a reactive stream, this can also take a collection that internally streams on its own.

then → Transformation comes into picture here. Note it down, as this doesn’t block any threads.

You can also block explicitly using the block() DSL on a failure.

Sync & Async Makers in the above code snippet are the custom definitions to interact with external API over HTTP using Spring RestTemplates.

Beyond the example: Core API, Streams API, Sequence API and Operators: zipping, sharding, Contorl, Batching, Chunking, Futures are worthy enough to take a look that can address several business use cases out of the box.

Finally, at an eagle eye:

  1. SimpleReact Object is used to start NIO interaction.
  2. Don’t forget the Future Spring to JDK converter dependency.
  3. When your system is already under heavy load with many external threads and your requirement can wait for responses, configure with limited custom thread pool executor.
  4. Use async to distribute work across threads.
  5. Use the DSL.
  • from() → starts a reactive buffer dataflow
  • allOf() → To Perform Aggregation
  • anyOf → React to the completion of any of the events.
  • then() → Chain of React transformations starts from
  • flatMap() → Flatten to a single unsafe Stream.
  • with() → applied to the results of currently active event tasks.
  • filter() → remove elements that don’t match (Async)
  • onFail(), block(), async(), sync() are most useful.

You can find the performance numbers of using Cyclops at here.

Many thanks to Pedro Betancourt for corrections.

Gopi Krishna Kancharla- Founder of http://allibilli.com

--

--