Optimizing cyclops-react Streams

John McClean
6 min readOct 14, 2015

--

If you want to improve the performance of your cyclops-react Streams you have a number of options, which we will discuss in depth below.

Some performance enhancing options for simple-react Streams coming up!!
Star forming image Creative Commons licenced by Hubble Heritage : https://creativecommons.org/licenses/by-sa/2.0/

Choose the right Stream type

simple-react v0.99.3 and above support three Stream types.

a. LazyFutureStream : choose this Stream for lazy asynchronous operations, such as loading files from disk (or saving files to disk), data from or to a database, data from or to a remote service.

LazyFutureStream performance is in red (orders of magnitude faster), sequential Stream performance in blue.

relative performance of LazyFutureStream for blocking I/O versus a sequential Stream implementation (SequenceM)

b. SimpleReactStream : choose this Stream for a finite set of asyncrhonous operations. (Again any blocking I/O will be a good match). The set of operators for SimpleReactStream is more limited, but may suit your use case.

c. ReactiveSeq : choose this Stream for sequential CPU bound operations. The other Streams are Stream of Futures, ReactiveSeq avoids this overhead and performs on a par with other Stream implementations with a large variety of powerful operators.

The graph below shows the relative performance of ReactiveSeq versus LazyFutureStream without Object Pooling (blue) and with Object Pooling (red), while executing a fairly simple map & forEach operation, sequentially on a single thread. The graph shows the performance impact at various Stream sizes (200k, 20M and 20B). In general for single-threaded operations ReactiveSeq performs much better.

relative performance of ReactiveSeq, LazyFutureStream (w/ and without Object Pooling) for large sequential Streaming operation.

Turn autoOptimizeOn

LazyFutureStream is a Stream of Futures tasks. Each operator creates a new task for each Future to be executed on Completion of the previous stage. The tasks can be executed on the same thread as the previous task, or submitted to an Executor, that will manage task execution.

async task execution, overhead is expensive

Executing a task on the same thread involves significantly less overhead. Balanced against this you may want to distribute your work load over all available threads. AutoOptimize automatically distributes work across threads when necessary, and keeps tasks executing on the same thread when more performant to do so.

task submission overhead is avoided

LazyFutureStream provides a large range of complex operators. Some of these operators require access to the aggregated or ordered results of Future tasks from the previous stage. simple-react handles this by making use of wait-free Queues to pass data from one operator to the next (where neccessary). Auto-Optimize makes sure your operators always take full advantage of all available threads.

An example showing a Stream with autoOptimize on (this is the default for Streams created via the LazyReact builder).

Turn autoMemoizeOn

Operations that make I/O calls or perform CPU intensive tasks, may be good candidates for Caching. LazyFutureStream provides autoMemoization functionality that automatically caches the result of each task for future use. The actual cache implementation is pluggable. E.g. Auto-memoization with Google Guava with an LRU cache with a Time To Live.

Object Pooling

If you have an constant Stream of data, you may get better performance by turning on Object Pooling. Object Pooling for LazyFutureStream ensures that the Futures used in the Stream come from the same pool. I.e. instead of producing Garbage by creating a new Future Object for each element in the Stream, Futures are reused from a pool. For long running processes this can significantly reduce Garbage Collector overhead.

Object Pooling Benchmark Results

Whereas in previous versions of simple-react throughput fell significantly from ~330 million identity functions per second to less than half that, for continuous processing with v0.99.1, throughput remains remarkably healthy at ~750 million identity functions applied per second (compared with short-burst capacity of just over 800 million per second).

Operate directly on the underlying Futures

The standard mode of execution in LazyFutureStream is to operate on the results of the previous stage, but in many cases it may result in better performance if we operate directly on the underlying Futures. For example we can use the limit operator to either

  1. Stop the processing after a certain number of results have been received
  2. Only allow a number of tasks to be executed in the first place

Depending on your use case 1 or 2 may be a better fit and result in better performance for your application. 1. involves using the standard limit operator, but 2. involves using actOnFutures#limit.

The gist below shows example code for both options (the top is 1. and the bottom 2.).

The first example may perform better by having 3 completed results available earlier. The second example may help your system overall perform better, by placing less load upon it (only starting 3 tasks).

There are a large number of operators for working directly with underlying Futures, and these can be mixed with operators that work directly with the results of a stage. You are free to choose the optimal combination for your application.

Manage queue types

By default LazyFutureStream uses the JDK wait free Queue implementation (ConcurrentLinkedQueue ), but you may get better performance an alternative implementation. simple-react ships with Agrona, and it’s ManyToOneConcurrentArrayQueue. This is a Java implementation of an algorithm from Fast Flow, by the leading Java experts in this area (Martin Thompson, Richard Warburton, Todd Montgomery) via their Agrona project (which provides data structures and utilities used in the ultra-low-latency Aeron messaging system.

Performance characteristics

In simple benchmarking, LazyFutureStreams backed by an Agrona ManyToOneConcurrentArrayQueue can perform up to 40% faster than LazyFutureStreams backed by a JDK bounded wait free Queue (ConcurrentLinkedQueue). While results for most queue types showed significant variation in performance, throughput from LazyFutureStreams that are backed by Agrona ManyToOneConcurrentArrayQueue’s were much more stable. Differences in performance for LazyFutureStreams backed by ManyToOneConcurrentArrayQueue and ConcurrentLinkedQueue varied between 0 and over 40%. Non-blocking Queues performed up to twice as well as blocking queues (also with a lot of variation).

Bound size matters

Creating a bounded queue with a large buffer capacity can be expensive. Agrona ManyToOneConcurrentArrayQueue’s with a bound size of 200,000 entries were ~10 times slower than Agrona ManyToOneConcurrentArrayQueue’s with a bound size of 110.

JDK ConcurrentLinkedQueue is the default

Despite the improvements apparent from using the Agrona ManyToOneConcurrentArrayQueue as the backing Queue, we continue to use JDK ConcurrentLinkedQueue as the default. This is because it is impossible to tell what the bound size should be for all operations. For many / most operations, the bound size can be very similar to the concurrency level as determined by the MaxActive settings. But for other operators such as flatMap, it isn’t possible to tell how many elements would need to be buffered on the queue. For that reason, the backing queue is configurable per stage via two simple operators.

Switching to an Agrona Queue

By using the boundedWaitFree operator on the LazyFutureStream you can switch to an Agrona ManyToOneConcurrentArrayQueue, and enjoy the inherent performance boost.

In order to switch between queue implementations the unboundedWaitFree and boundedWaitFree operators can be used per stage. For example the flatMap operator can result in multiple data points being created per existing element, potentially overwhelming an optimally configured bounded queue. The solution is to switch to an unboundedWaitFree Queue for the flatMap operation.

Efficient Stream reversal

If you are making use of the reverse operator, or any of the ‘right’ based operators (such as foldRight or scanRight), it will make sense to take advantage of the efficient Stream Reversal functionality built in to SequenceM and LazyFutureStream.

Creatonal operators that support efficient Stream reversal

  • LazyReact#range
  • LazyReact#fromIterable
  • LazyReact#reactIterable
  • LazyReact#react(Collection<Supplier<U>> actions)
  • ReactiveSeq#range
  • ReactiveSeq#fromList
  • ReactiveSeq#of

--

--

John McClean
John McClean

Written by John McClean

Architecture @ Verizon Media. Maintainer of Cyclops. Twitter @johnmcclean_ie