Applying back-pressure across Streams
If we would like to connect 3 Streams together, one for reading I/O, one for computational CPU bound processing and another for writing I/O we introduce the possibility that the producing Streams could produce data faster than the consuming Streams can handle.
cyclops-react offers a number of approaches to managing backpressure (including implementing the reactive-streams api). In this article we will look at connecting Streams together via cyclops-react Queues which we can also use to manage backpressure for us. The two approaches are
- Backpressure via a BlockingQueue
- Backpressue via a wait free Queue
Backpressure via a BlockingQueue
We can set up three Streams :-
a. A LazyFutureStream for handling input
b. A ReactiveSeq for computational CPU bound work
c. A LazyFutureStream for handling output
There is more information on why these Stream choices make sense from a performance perspective in this article, on Optimizing Streams with cyclops-react
We can connect them via a transfer Queue between each connecting Stream.
In this instance the producing threads will block when the Queue capacities are reached (and the consumers will also be blocked if the Queues empty) freeing up the CPU for other usage. This is achieved via a JDK Blocking Queue under the hood.
Backpressure via a wait-free Queue
We can create a similar configuration that connects Streams via an Agrona wait-free Queue.
In this instance the CPU is never relinquished by producing and consuming threads, they switch from processing data to polling the queue for capacity once it becomes full. Under the hood this is using an Agrona wait-free / non-blocking bounded queue, you can plugin in different wait strategies should you wish (they are available on the WaitStrategy interface).
Is this thread safe?
Yes this does support multiple threads writing to the Queue (without synchronizaton). The second example above uses a ManyToOneConcurrentArrayQueue from the Agrona library (that’s from the same people that built LMAX Disruptor among others — Martin Thompspon, Todd Montgomery, Richard Warburton are all major contributors), which supports multiple concurrent producers and a single consumer. Even when the consuming Stream is a parallel one cyclops-react makes sure that the actual reads are done by a single consumer thread (before passing of tasks to multiple threads).
There are no locks or synchronization blocks used in the Agrona implementation at all, so you may find it gives a more consistent throughput rate that alternatives. (But depending on your needs using a simple BlockingQueue might give higher throughput rates at the expense of less consistency / higher latency).