10 Pitfalls In Reactive Programming
with Pivotal’s Reactor
I’ve been doing Scala projects with Akka Streams for quite a few years now and I have a reasonably good feeling of things to watch out for. At my current project we are doing Java and we are using a different implementation of the Reactive Streams Specification: Reactor. While learning the library I stumbled upon many common mistakes and bad practices which I’ll be listing here. Credits to Enric Sala for pointing out these bad practices.
Firstly, let’s have a look at the Reactive Streams Specification and see how Reactor maps to that. The spec is pretty straight forward
Publisher that is a potential source of data. One can subscribe to a
Publisher with a
Subscriber. One passes a
Subscription to a
Subscription is used to demand elements from the
Publisher. This is the core principle of Reactive Streams. The demand controls whether data can flow through.
With Reactor roughly there’s two basic types that you are dealing with:
Publishercontaining 0 or 1 element
Publishercontainer 0..N elements
There’s a type called
CoreSubscriber which implements the
Subscriber interface, but this is more like an internal API. As a user of the library you don’t really have to use this directly. One can “subscribe” to a
Flux in a blocking way by using one of the
block method variants. One can also use the
subscribe method to for instance register a lambda. This will return a
Disposable type which can be used to cancel the subscription.
Alright, enough theory. Let’s dive into some code. Below I’ll list 10 potentially problematic code snippets. Some will be plain wrong, others are more like a bad practice or a smell. Can you spot them?
#1: Whoop Whoop Reactive!
Let’s start simple and try to use a
So what’s going on here? In our
problem method we are calling an
update method which returns a
Mono<Void>. It’s a void, because we don’t really care about the result, so what could be wrong here?
update method actually won’t be executed at all. Remember that the demand determines whether data can flow through? And that the demand is controlled by the subscription. In this snippet we didn’t subscribe at all to the
Mono, hence it won’t be executed.
The fix is pretty simple. We just have to use a terminal operation, like one of the
Alternatively, we could propagate the
Mono to the caller of the
#2: Reactive + Reactive = Reactive
Now we know how to deal with reactive methods, let’s try to compose them.
First we are calling
create and then we use
doOnNext to make a call to the
update method. The
then() call ensures we are returning a
Mono<Void> type. Should be fine, right?
It might surprise you that also in this case the
update method won’t be executed. Using
doOnNext or any of the
doOn* methods are NOT subscribing to publishers.
#3: Subscribe all the Publishers!
Cool, we know how to fix this! Just subscribe the inner publisher, right?
This might actually work, however the inner subscription won’t be nicely propagated. That means we don’t have any control of it as a subscriber to the publisher returned by the
The take away here is to only use
doOn* methods for side-effects, e.g. logging, uploading metrics.
To fix this code properly and propagate the inner subscription we need to use one of the
map flavours. Let’s use
flatMap since we want to flatten the inner
Mono and compose a single stream. We can also drop the
then() call, because
flatMap will already return the type of the inner publisher;
Just flatMap that sh*t! Sweet :)
#4: I didn’t quite catch that…
Are you ready for another one?
This time we do
subscribe to the Mono returned by the update method. It could potentially throw an
Exception so we apply defensive programming and wrap the call in a try-catch block.
However, as the
subscribe method doesn’t necessarily block, we might not catch the exception at all. A simple try-catch structure doesn’t help with (potentially) asynchronous code.
To fix it we can either use
block() again instead of
subscribe() or we can use one of the built-in error handling mechanisms.
You can use any of the
onError* methods to register an “on-error hook” and return a fallback publisher.
#5: Watch me
Let’s have a look at the following snippet
What we are trying to achieve here is to subscribe to the update and transform the result to a
Mono<Integer>. Hence, we use the
map operation to get the length of the string foo.
update will be executed at some point, we are again not propagating the inner subscription, similar to pitfall #3. The inner subscription is detached and we have no control over it.
A better way would be to once again use
flatMap and transform the result using the
Should you bother to use
subscribe at all? Most of the time not. There are a few potential use cases:
- Short-lived fire-and-forget tasks (e.g. telemetry, uploading logs). Please be mindful about concurrency and execution context.
- Long-running background jobs. Remember the
Disposablethat is being returned. Use it for lifecycle control.
#6: Don’t count on it…
The next one might be a tricky one
Here we are simply accumulating all numbers flowing through our stream using a
doOnNext operator and print out the resulting sum when the stream completes using the
doOnComplete operator. We are using an
AtomicInteger to guarantee thread-safe increments.
This might seem to work when calling
problem().block() once or even multiple times. However, we will a completely different outcome if we subscribe to the result of
problem() multiple times. Moreover, if for whatever reason downstream a subscription gets renewed the count will be off too. This happens due to the fact that we are collecting state outside of the publisher. There is shared mutable state amongst all subscribers, which is a pretty bad smell.
The proper way would be to defer the initialisation of the state to the publisher, for instance by wrapping it in a
Mono as well. That way every subscriber keeps its own count.
#7: Close, but no Cigar
The next one has a similar issue. Can you spot it?
Here we are trying to upload an input stream and our
UploadService is nice enough to close it for us when we are done using the
doFinally operator. To ensure we finish the upload successfully we want to retry five times on any failure using the
When a retry kicks in we will notice that the input stream is already closed and all our retries will be exhausted with an
IOException. Similar to the previous case we are dealing with state outside of the publisher here, namely the input stream. We are closing it, hence changing its state, by using the
doFinally operator. This is a side-effect which we should avoid.
The solution once again is to defer the creation of the input stream to the publisher.
#8: Trick or Thread
The following issue is likely the most subtle one out of the ten, but nevertheless good to be aware of
Here we are doing everything right on first glance. We are once again composing two publisher, this time by using
This code will probably work, but it’s worthwhile realising what’s going on behind the scenes. While
flatMap looks like a simple mutator similar to the ones on a collection like API, in Reactor it’s an asynchronous operator. The inner publisher will be subscribed to asynchronously. This leads to uncontrolled parallelism. Depending on how many elements our
Flux<String> findAll() will emit we are potentially starting 100’s of concurrent sub streams. This is probably not what you want and I think the Reactor API should be more explicit about this if not disallow this.
With Akka Streams for instance this wouldn’t even be possible. The corresponding operator is explicitly called
mapAsync, which gives you a clear indication that you are dealing with concurrent execution here. Moreover, it strictly requires you to explicitly limit the concurrency by passing a parallelism integer parameter.
Luckily there’s an overload for
flatMap in Reactor that allows you to configure the parallelism as well.
Often you wouldn’t even need parallelism at all. If you just want to compose two streams synchronously you can use the
#9: My Stream is Leaking
Almost there. When writing reactive code you sometimes have to integrate with non-reactive code. This is what the following snippet is about.
This code is almost too simple. We are dealing with a Flux<String>, but we don’t want our API to expose reactive types. Therefore, we are converting our stream to an
Iterable<String> using the built-in
While this will probably lead to the expected result, transforming a Reactor stream to an
Iterable in this way is a smell. An
Iterable does not support closing so the publisher will never know when it’s done. Frankly, I don’t understand why
toIterable is even part of the stream API. I think we should avoid it!
The alternative is to convert to the newer
java.util.Stream API using the
toStream method. This does support closing of the resources neatly.
#10: I don’t want this to end
If you came this far, congrats! You might not want this to end, like in the code snippet below
Here we continuously want to observe a stream and save each element as it flows through. This will be a potentially endless stream so we don’t want to block the main thread. Therefore, we are subscribing to the elastic
Scheduler using the
subscribeOn operator. This scheduler dynamically creates
ExecutorService-based workers and caches the thread pools for reuse. Finally, we call
subscribe() to make sure the stream will be executed.
The issue here is that any failure in either the observe upstream or the inner publisher created by save will result in termination of the stream. We are lacking error handlers or a retry mechanism.
One can for instance
- register an error handler using one of the
- use any of the
retryoperator variants on either the inner or outer publisher
- use the
doOnTerminatehook to restart the complete stream.
So, lessons learned. If you can take away a few things from this it would be the following
Don’t make any assumptions about other publishers
- The upstream can fail, so you need to handle potential errors and think about retries and fallbacks
- Control concurrency and execution context. Keep things simple and prefer
flatMapif you don’t strictly need parallel execution. If you do need parallelism, be explicit about its limits using the
flatMap(lambda, parallelism)overload. Also, in those cases use
subscribeOnto use an appropriate
Don’t make any assumptions about other subscribers
- Avoid side-effects and closing over mutable state outside the publisher
- It should be always safe to (re)subscribe
Thanks for reading! I hope you enjoyed it and learned some things, like I did :)