Into the jungle of reactive operators

Benedikt Jerat
Dec 20, 2019 · 8 min read
Image for post
Image for post
Photo by Martyna Bober on Unsplash

In my last blog post, I gave you an introduction to reactive programming that was meant to explain when to use and, even more important, when not to use reactive programming. This emerging programming paradigm is initially anything but easy to grasp. The abstraction level is relatively high and there are a few rules for dealing with publishers and subscribers that you need to adhere to.

One of the most difficult things at the beginning is to start thinking reactive. To do so, it is helpful to get to know a few essential reactive operators and just play around with them. This article will highlight a few of the most important operators.

What The Operator?!

Image for post
Image for post
Photo by Ohm Kittipong on Unsplash

With no doubt, the plethora of operators in Reactive libraries is overwhelming at first. As the saying goes: “All beginnings are difficult”. This, in particular, applies to the first steps into reactive programming.

There are operators for creating reactive streams

  • out of raw values,
  • by supplying a generator function,
  • by capturing other types, like Futures, Runnables, or Callables,
  • or by consuming Arrays, Streams, or Iterables.

Multiple sequences may be concatenated together either

  • sequentially,
  • by pairing each value,
  • or each time a new value arrives at either side.

Then there are operators for

  • mapping each element to one or multiple results,
  • applying some predicate function to at least one or all values,
  • or aggregating every element into one.

In total, more than 25 operators are available for all aspects concerning the filtering of sequences and no less for handling exceptions and errors.

Keeping a clear understanding of these operators is not quite easy. Therefore, the first thing I recommend is to learn how to read Marble diagrams. These diagrams are a graphical representation of applying operators to reactive streams. They help a lot.

Image for post
Image for post
https://projectreactor.io/docs/core/release/reference/#flux

Elements that are emitted by the publisher are depicted as marbles (or bubbles if you will). The horizontal lines represent timelines that go from left to right. The top line represents the timeline of the publisher, the lower line the result of the transformation when applying the operation in the middle. Successful execution is illustrated as a vertical line, abrupt termination as a cross. If you have seen these representations a few times, you can assemble a visual picture in your head while programming.

All your operator are belong to us

With a few exceptions, all the operators are defined on both reactive types of the Spring Reactor framework (Mono and Flux). Some operators you will use all the time, some just have special use cases, like Mono.never(). The production usage of this operator is very limited (but does exist), nonetheless it is quite useful in test scenarios to simulate timeout behaviour because it will never signal any data, error, or completion signal.

It would be pointless to list all operators here in detail. This article would become somewhat gigantic. Therefore, let’s examine a few operators you’ll probably encounter every day in your projects, the reactive workhorses.

Directly creating a sequence: just

Image for post
Image for post
https://github.com/reactor/reactor-core/blob/master/reactor-core/src/main/java/reactor/core/publisher/doc-files/marbles/just.svg

just is the most simple way to create a Mono or Flux.

Mono from simple value (strict evaluation)

As you can see, the UUID is always the same. That’s because the value is evaluated strictly and will therefore be the same for every subscriber.

Be careful: When the passed value is evaluated by calling some blocking method, the call will still block the calling thread and nothing is won by making things “reactive”. Use one of the following operators instead.

Lazily creating a sequence: fromSupplier, defer

Image for post
Image for post
https://github.com/reactor/reactor-core/blob/master/reactor-core/src/main/java/reactor/core/publisher/doc-files/marbles/fromSupplier.svg
Image for post
Image for post
https://github.com/reactor/reactor-core/blob/master/reactor-core/src/main/java/reactor/core/publisher/doc-files/marbles/deferForMono.svg

fromSupplier and defer share some functionality in the sense that both delay execution. The evaluation is triggered by each subscription. Therefore, every subscriber receives a new value.

Mono from simple value (deferred evaluation)

This allows blocking calls to be non-blocking and is very useful for integrating legacy code into the reactive world.

Transforming a sequence: map

Image for post
Image for post
https://rxmarbles.com/#map

Just like the equivalent method from the Collections API, the map method transforms each individual element by applying a function.

Directly transforming values by using the map operator

Here, we are converting the database entity into the external representation by applying the map function to the sequence of queried database entities.

It should be noted though that the transformation is applied synchronously (although still being non-blocking overall). The map operator is therefore mainly intended for simple conversions between types, such as mapping database entities to their external representation. However, if the operation itself is an http call against an external system, flatMap is better suited.

Transforming a sequence: flatMap

Image for post
Image for post
https://github.com/reactor/reactor-core/blob/master/reactor-core/src/main/java/reactor/core/publisher/doc-files/marbles/flatMapForFlux.svg

This marble chart for flatMap definitely looks confusing at first, so let’s start with an example to clear things up.

Transforming and flattening publishers by using the flatMap operator

Let’s assume, we want to query the newest critical review for games published in the year 2019. First, we’re searching for games published in the given year, which is already producing a Flux. For every emitted game an http call to a service hosting the reviews is made, resulting in zero or one review emitted per game. The reviews are then returned to the client.

So, how does this map to the picture above?

The games published in the year 2019 are the line at the top, one marble per game found in our storage. For every emission, a subsequent call to the reviews service is performed, represented by the lines inside the box, itself returning a publisher. In our case, only one review at most will be returned, however, flatMap allows multiple emissions by these subsequent calls. The inner publishers are then flattened into a single Flux through merging.

Since flatMap itself accepts a function that produces publishers, the processing is completely asynchronous and non-blocking.

Just one closing note: flatMap allows values of flattened values to interleave with different inner publishers and additionally does not preserve the original ordering. Dependent on your requirements operators like concatMap, flatMapSequential or switchMap may be better suited. But that’s material for another article.

Combining two sequences: merge

Image for post
Image for post
https://rxmarbles.com/#merge

The merge operator combines multiple publishers by subscribing to all of them eagerly and merge the results as they come into the final sequence. The operator is especially useful in Scatter-Gather workflows, typically known as one of the Enterprise Integration Patterns.

Image for post
Image for post
https://www.enterpriseintegrationpatterns.com/patterns/messaging/BroadcastAggregate.html

A request is broadcast to multiple recipients (Scatter) and the responses are reaggregated into one single response for the client (Gather).

The emission order is not fixed though, especially if some publishers produce results faster than others. I will later present a comprehensive example that also uses the merge operator.

Filtering sequences: filter

Image for post
Image for post
https://rxmarbles.com/#filter

Another operator that works similarly to the equivalent method in the Collections API is the filter operator. If the source value matches the predicate, the value will be emitted, otherwise, it will be discarded.

Filtering out values by applying a predicate function

In our example, we are filtering out tweets we are not interested in by checking the associated tags. The remaining values are then processed further.

Using multiple data streams

So far, the examples mainly consisted of individual operators. Therefore, I would like to continue with a slightly larger example.

Imagine you want to provide a service that visualizes a condensed view of metadata, critics, ratings, and so forth for gaming titles. Something similar to what you get when you visit the IMDB website. To achieve this, different providers for critics and ratings must be integrated first. Based on this, a cumulated rating is calculated on a normalized scale. The rating is then joined with data in our metadata store.

This use case results in one data stream with metadata and multiple data streams of rating information that ultimately need to be united to one final result. It makes sense to visualize the data streams first to be able to formulate a clear target picture:

         /----------> details ------------------> overall
/ /
/ /---> metacritic ---\ /
/ / \ /
name --------|--------> ign ---------> rating
\ /
\----> gamespot ----/

Since we want to combine the results of the three integrated interfaces, we need an operator like zip, concat or merge. To achieve a faster total result, we want to simultaneously perform all requests against the external systems. As the concat operator requests the specified sources one after the other whenever the previous source provided a result, the operator is not suitable for this use case. Theoretically, the zip operator could be used, but all data sources provide the same format. Therefore, the merge operator is easier to work with.

Single result by combining publishers from different sources

The implementation exemplifies the typical Scatter-Gather workflow.

The game metadata is already persisted in our database. By using a framework like R2DBC, we can also make the database queries reactive. To calculate the average rating, a request is sent to multiple providers by using the merge operator. An average rating is calculated by applying the reduce operator. Game details and the rating are combined by using zip and transformed into one result stream by using the map operator.

Although the last example was simplified a bit by just aggregating the rating, you hopefully still get the idea. The entire calculation chain is non-blocking. The different providers are requested in parallel and the answers are merged without us having to worry about concurrency. Since the framework relieves us from the technical challenges, we may focus on the actual business case.

And therein lies the charm of reactive programming. There are countless operators, but they all have their use and their semantics. Sometimes you just have to experiment a little.

Thanks for reading! Feel free to comment or message me, when you have questions or suggestions.

For those who want to delve further into the subject of “reactive operators”, I have put together a few more resources.

Digital Frontiers — Das Blog

Dies ist das Blog der Digital Frontiers GmbH & Co.

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store