In my last blog post, I gave you an introduction to reactive programming that was meant to explain when to use and, even more important, when not to use reactive programming. This emerging programming paradigm is initially anything but easy to grasp. The abstraction level is relatively high and there are a few rules for dealing with publishers and subscribers that you need to adhere to.
One of the most difficult things at the beginning is to start thinking reactive. To do so, it is helpful to get to know a few essential reactive operators and just play around with them. This article will highlight a few of the most important operators.
What The Operator?!
With no doubt, the plethora of operators in Reactive libraries is overwhelming at first. As the saying goes: “All beginnings are difficult”. This, in particular, applies to the first steps into reactive programming.
There are operators for creating reactive streams
- out of raw values,
- by supplying a generator function,
- by capturing other types, like Futures, Runnables, or Callables,
- or by consuming Arrays, Streams, or Iterables.
Multiple sequences may be concatenated together either
- by pairing each value,
- or each time a new value arrives at either side.
Then there are operators for
- mapping each element to one or multiple results,
- applying some predicate function to at least one or all values,
- or aggregating every element into one.
In total, more than 25 operators are available for all aspects concerning the filtering of sequences and no less for handling exceptions and errors.
Keeping a clear understanding of these operators is not quite easy. Therefore, the first thing I recommend is to learn how to read Marble diagrams. These diagrams are a graphical representation of applying operators to reactive streams. They help a lot.
Elements that are emitted by the publisher are depicted as marbles (or bubbles if you will). The horizontal lines represent timelines that go from left to right. The top line represents the timeline of the publisher, the lower line the result of the transformation when applying the operation in the middle. Successful execution is illustrated as a vertical line, abrupt termination as a cross. If you have seen these representations a few times, you can assemble a visual picture in your head while programming.
All your operator are belong to us
With a few exceptions, all the operators are defined on both reactive types of the Spring Reactor framework (Mono and Flux). Some operators you will use all the time, some just have special use cases, like
Mono.never(). The production usage of this operator is very limited (but does exist), nonetheless it is quite useful in test scenarios to simulate timeout behaviour because it will never signal any data, error, or completion signal.
It would be pointless to list all operators here in detail. This article would become somewhat gigantic. Therefore, let’s examine a few operators you’ll probably encounter every day in your projects, the reactive workhorses.
Directly creating a sequence: just
just is the most simple way to create a Mono or Flux.
As you can see, the UUID is always the same. That’s because the value is evaluated strictly and will therefore be the same for every subscriber.
Be careful: When the passed value is evaluated by calling some blocking method, the call will still block the calling thread and nothing is won by making things “reactive”. Use one of the following operators instead.
Lazily creating a sequence: fromSupplier, defer
defer share some functionality in the sense that both delay execution. The evaluation is triggered by each subscription. Therefore, every subscriber receives a new value.
This allows blocking calls to be non-blocking and is very useful for integrating legacy code into the reactive world.
Transforming a sequence: map
Just like the equivalent method from the Collections API, the
map method transforms each individual element by applying a function.
Here, we are converting the database entity into the external representation by applying the map function to the sequence of queried database entities.
It should be noted though that the transformation is applied synchronously (although still being non-blocking overall). The
map operator is therefore mainly intended for simple conversions between types, such as mapping database entities to their external representation. However, if the operation itself is an http call against an external system,
flatMap is better suited.
Transforming a sequence: flatMap
This marble chart for
flatMap definitely looks confusing at first, so let’s start with an example to clear things up.
Let’s assume, we want to query the newest critical review for games published in the year 2019. First, we’re searching for games published in the given year, which is already producing a
Flux. For every emitted game an http call to a service hosting the reviews is made, resulting in zero or one review emitted per game. The reviews are then returned to the client.
So, how does this map to the picture above?
The games published in the year 2019 are the line at the top, one marble per game found in our storage. For every emission, a subsequent call to the reviews service is performed, represented by the lines inside the box, itself returning a publisher. In our case, only one review at most will be returned, however,
flatMap allows multiple emissions by these subsequent calls. The inner publishers are then flattened into a single
Flux through merging.
flatMap itself accepts a function that produces publishers, the processing is completely asynchronous and non-blocking.
Just one closing note:
flatMap allows values of flattened values to interleave with different inner publishers and additionally does not preserve the original ordering. Dependent on your requirements operators like
switchMap may be better suited. But that’s material for another article.
Combining two sequences: merge
merge operator combines multiple publishers by subscribing to all of them eagerly and merge the results as they come into the final sequence. The operator is especially useful in Scatter-Gather workflows, typically known as one of the Enterprise Integration Patterns.
A request is broadcast to multiple recipients (Scatter) and the responses are reaggregated into one single response for the client (Gather).
The emission order is not fixed though, especially if some publishers produce results faster than others. I will later present a comprehensive example that also uses the
Filtering sequences: filter
Another operator that works similarly to the equivalent method in the Collections API is the
filter operator. If the source value matches the predicate, the value will be emitted, otherwise, it will be discarded.
In our example, we are filtering out tweets we are not interested in by checking the associated tags. The remaining values are then processed further.
Using multiple data streams
So far, the examples mainly consisted of individual operators. Therefore, I would like to continue with a slightly larger example.
Imagine you want to provide a service that visualizes a condensed view of metadata, critics, ratings, and so forth for gaming titles. Something similar to what you get when you visit the IMDB website. To achieve this, different providers for critics and ratings must be integrated first. Based on this, a cumulated rating is calculated on a normalized scale. The rating is then joined with data in our metadata store.
This use case results in one data stream with metadata and multiple data streams of rating information that ultimately need to be united to one final result. It makes sense to visualize the data streams first to be able to formulate a clear target picture:
/----------> details ------------------> overall
/ /---> metacritic ---\ /
/ / \ /
name --------|--------> ign ---------> rating
\----> gamespot ----/
Since we want to combine the results of the three integrated interfaces, we need an operator like
merge. To achieve a faster total result, we want to simultaneously perform all requests against the external systems. As the
concat operator requests the specified sources one after the other whenever the previous source provided a result, the operator is not suitable for this use case. Theoretically, the
zip operator could be used, but all data sources provide the same format. Therefore, the
merge operator is easier to work with.
The implementation exemplifies the typical Scatter-Gather workflow.
The game metadata is already persisted in our database. By using a framework like R2DBC, we can also make the database queries reactive. To calculate the average rating, a request is sent to multiple providers by using the
merge operator. An average rating is calculated by applying the
reduce operator. Game details and the rating are combined by using
zip and transformed into one result stream by using the
Although the last example was simplified a bit by just aggregating the rating, you hopefully still get the idea. The entire calculation chain is non-blocking. The different providers are requested in parallel and the answers are merged without us having to worry about concurrency. Since the framework relieves us from the technical challenges, we may focus on the actual business case.
And therein lies the charm of reactive programming. There are countless operators, but they all have their use and their semantics. Sometimes you just have to experiment a little.
Thanks for reading! Feel free to comment or message me, when you have questions or suggestions.
For those who want to delve further into the subject of “reactive operators”, I have put together a few more resources.
Operators and what are they meant for
Resources for marble diagrams