10 Pitfalls In Reactive Programming

with Pivotal’s Reactor

Jeroen Rosenberg
Nov 28 · 8 min read

I’ve been doing Scala projects with Akka Streams for quite a few years now and I have a reasonably good feeling of things to watch out for. At my current project we are doing Java and we are using a different implementation of the Reactive Streams Specification: Reactor. While learning the library I stumbled upon many common mistakes and bad practices which I’ll be listing here. Credits to Enric Sala for pointing out these bad practices.

Reactive Streams

Firstly, let’s have a look at the Reactive Streams Specification and see how Reactor maps to that. The spec is pretty straight forward

There’s a Publisher that is a potential source of data. One can subscribe to a Publisher with a Subscriber. One passes a Subscription to a Subscriber. The Subscription is used to demand elements from the Publisher. This is the core principle of Reactive Streams. The demand controls whether data can flow through.

With Reactor roughly there’s two basic types that you are dealing with:

  1. Mono, a Publisher containing 0 or 1 element
  2. Flux, a Publisher container 0..N elements

There’s a type called CoreSubscriber which implements the Subscriber interface, but this is more like an internal API. As a user of the library you don’t really have to use this directly. One can “subscribe” to a Mono or Flux in a blocking way by using one of the block method variants. One can also use the subscribe method to for instance register a lambda. This will return a Disposable type which can be used to cancel the subscription.

10 pitfalls

Alright, enough theory. Let’s dive into some code. Below I’ll list 10 potentially problematic code snippets. Some will be plain wrong, others are more like a bad practice or a smell. Can you spot them?

Let’s start simple and try to use a Mono type.

So what’s going on here? In our problem method we are calling an update method which returns a Mono<Void>. It’s a void, because we don’t really care about the result, so what could be wrong here?

Well, the update method actually won’t be executed at all. Remember that the demand determines whether data can flow through? And that the demand is controlled by the subscription. In this snippet we didn’t subscribe at all to the Mono, hence it won’t be executed.

The fix is pretty simple. We just have to use a terminal operation, like one of the block or subscribe variants.

Alternatively, we could propagate the Mono to the caller of the problem method.

Now we know how to deal with reactive methods, let’s try to compose them.

First we are calling create and then we use doOnNext to make a call to the update method. The then() call ensures we are returning a Mono<Void> type. Should be fine, right?

It might surprise you that also in this case the update method won’t be executed. Using doOnNext or any of the doOn* methods are NOT subscribing to publishers.

Cool, we know how to fix this! Just subscribe the inner publisher, right?

This might actually work, however the inner subscription won’t be nicely propagated. That means we don’t have any control of it as a subscriber to the publisher returned by the problem method.

The take away here is to only use doOn* methods for side-effects, e.g. logging, uploading metrics.

To fix this code properly and propagate the inner subscription we need to use one of the map flavours. Let’s use flatMap since we want to flatten the inner Mono and compose a single stream. We can also drop the then() call, because flatMap will already return the type of the inner publisher; Mono<Void>.

Just flatMap that sh*t! Sweet :)

Are you ready for another one?

This time we do subscribe to the Mono returned by the update method. It could potentially throw an Exception so we apply defensive programming and wrap the call in a try-catch block.

However, as the subscribe method doesn’t necessarily block, we might not catch the exception at all. A simple try-catch structure doesn’t help with (potentially) asynchronous code.

To fix it we can either use block() again instead of subscribe() or we can use one of the built-in error handling mechanisms.

You can use any of the onError* methods to register an “on-error hook” and return a fallback publisher.

Let’s have a look at the following snippet

What we are trying to achieve here is to subscribe to the update and transform the result to a Mono<Integer>. Hence, we use the map operation to get the length of the string foo.

Although the update will be executed at some point, we are again not propagating the inner subscription, similar to pitfall #3. The inner subscription is detached and we have no control over it.

A better way would be to once again use flatMap and transform the result using the thenReturn operator.

Should you bother to use subscribe at all? Most of the time not. There are a few potential use cases:

  1. Short-lived fire-and-forget tasks (e.g. telemetry, uploading logs). Please be mindful about concurrency and execution context.
  2. Long-running background jobs. Remember the Disposable that is being returned. Use it for lifecycle control.

The next one might be a tricky one

Here we are simply accumulating all numbers flowing through our stream using a doOnNext operator and print out the resulting sum when the stream completes using the doOnComplete operator. We are using an AtomicInteger to guarantee thread-safe increments.

This might seem to work when calling problem().block() once or even multiple times. However, we will a completely different outcome if we subscribe to the result of problem() multiple times. Moreover, if for whatever reason downstream a subscription gets renewed the count will be off too. This happens due to the fact that we are collecting state outside of the publisher. There is shared mutable state amongst all subscribers, which is a pretty bad smell.

The proper way would be to defer the initialisation of the state to the publisher, for instance by wrapping it in a Mono as well. That way every subscriber keeps its own count.

The next one has a similar issue. Can you spot it?

Here we are trying to upload an input stream and our UploadService is nice enough to close it for us when we are done using the doFinally operator. To ensure we finish the upload successfully we want to retry five times on any failure using the retry operator.

When a retry kicks in we will notice that the input stream is already closed and all our retries will be exhausted with an IOException. Similar to the previous case we are dealing with state outside of the publisher here, namely the input stream. We are closing it, hence changing its state, by using the doFinally operator. This is a side-effect which we should avoid.

The solution once again is to defer the creation of the input stream to the publisher.

The following issue is likely the most subtle one out of the ten, but nevertheless good to be aware of

Here we are doing everything right on first glance. We are once again composing two publisher, this time by using flatMap.

This code will probably work, but it’s worthwhile realising what’s going on behind the scenes. While flatMap looks like a simple mutator similar to the ones on a collection like API, in Reactor it’s an asynchronous operator. The inner publisher will be subscribed to asynchronously. This leads to uncontrolled parallelism. Depending on how many elements our Flux<String> findAll() will emit we are potentially starting 100’s of concurrent sub streams. This is probably not what you want and I think the Reactor API should be more explicit about this if not disallow this.

With Akka Streams for instance this wouldn’t even be possible. The corresponding operator is explicitly called mapAsync, which gives you a clear indication that you are dealing with concurrent execution here. Moreover, it strictly requires you to explicitly limit the concurrency by passing a parallelism integer parameter.

Luckily there’s an overload for flatMap in Reactor that allows you to configure the parallelism as well.

Often you wouldn’t even need parallelism at all. If you just want to compose two streams synchronously you can use the concatMap operator.

Almost there. When writing reactive code you sometimes have to integrate with non-reactive code. This is what the following snippet is about.

This code is almost too simple. We are dealing with a Flux<String>, but we don’t want our API to expose reactive types. Therefore, we are converting our stream to an Iterable<String> using the built-in toIterable method.

While this will probably lead to the expected result, transforming a Reactor stream to an Iterable in this way is a smell. An Iterable does not support closing so the publisher will never know when it’s done. Frankly, I don’t understand why toIterable is even part of the stream API. I think we should avoid it!

The alternative is to convert to the newer java.util.Stream API using the toStream method. This does support closing of the resources neatly.

If you came this far, congrats! You might not want this to end, like in the code snippet below

Here we continuously want to observe a stream and save each element as it flows through. This will be a potentially endless stream so we don’t want to block the main thread. Therefore, we are subscribing to the elastic Scheduler using the subscribeOn operator. This scheduler dynamically creates ExecutorService-based workers and caches the thread pools for reuse. Finally, we call subscribe() to make sure the stream will be executed.

The issue here is that any failure in either the observe upstream or the inner publisher created by save will result in termination of the stream. We are lacking error handlers or a retry mechanism.

One can for instance

  1. register an error handler using one of the onError* operators
  2. use any of the retry operator variants on either the inner or outer publisher
  3. use the doOnTerminate hook to restart the complete stream.

Conclusion

So, lessons learned. If you can take away a few things from this it would be the following

  1. The upstream can fail, so you need to handle potential errors and think about retries and fallbacks
  2. Control concurrency and execution context. Keep things simple and prefer concatMap over flatMap if you don’t strictly need parallel execution. If you do need parallelism, be explicit about its limits using the flatMap(lambda, parallelism) overload. Also, in those cases use subscribeOn to use an appropriate Scheduler.
  1. Avoid side-effects and closing over mutable state outside the publisher
  2. It should be always safe to (re)subscribe

Thanks for reading! I hope you enjoyed it and learned some things, like I did :)

Jeroen Rosenberg

Dev of the Ops. Founder of Amsterdam.scala. Passionate about Agile, Continuous Delivery. Proud father of three.

Jeroen Rosenberg

Written by

Dev of the Ops. Founder of Amsterdam.scala. Passionate about Agile, Continuous Delivery. Proud father of three.

Jeroen Rosenberg

Dev of the Ops. Founder of Amsterdam.scala. Passionate about Agile, Continuous Delivery. Proud father of three.

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade