How to schedule emission from a Stream in Java

John McClean
Feb 9, 2016 · 3 min read

Cyclops supports the notion of scheduling the emission of elements from a Stream using cron, fixed-rate or fixed-delay based semantics.

Cron based scheduling

The gist below shows how to schedule emission from a Stream at a rate of one per second using cron based scheduling. Examples below show emission from a number of different Stream

We can also schedule infinite Streams, useful when we’d like a job to constantly run on a schedule for example - and even entirely unextended standard java.util.stream.Stream instances.

Connecting to emitted data

So now that we’ve scheduled our collection to start emitting data, how can we connect to it? The schedule method returns a synchronous HotStream.

A HotStream represents a Stream that is already independently emitting data, and we can connect as many downstream consumers to the parent Stream as we like.

The connect method uses an internal transfer Queue to pass data from the HotStream generated by the schedules, to the connected Stream (the connected Stream is defined by the debounce/peek and toList operators).

Note, that the toList method will block the current thread in the above example, we can use the futureOperations operator to move the entire execution of the connected Stream to another thread also, leaving the current thread unblocked.

By default a OneToOneConcurrentArrayQueue from Agrona is used to pass data from HotStreams to connected Streams. This is a wait/lock free queue, which works well when the producing and consuming Streams are in balance. If the producing Stream is faster than the consuming Stream backpressure can be applied by using something like a LinkedBlockingQueue as the transfer Queue, this can be provided as a parameter in the connect method.

Equally if the consuming Stream is much faster than the producing Stream, for high throughput applications (as opposed to low latency apps), we may waste CPU cycles spin locking as the consumer tries to take data from the empty queue.

Applying Back Pressure

One way to apply backpressure to a HotStream is to connect using a LinkedBlockingQueue.

Another way (not applicable to scheduled HotStreams) could be to make use of a PauseableHotStream

In general listening Streams should have no problem in keeping up with Scheduled Streams, because scheduling itself controls / slows down emission rates.

John McClean

Written by

Architecture @ Verizon Media. Maintainer of Cyclops. Twitter @johnmcclean_ie