Merging RxJava Observables considered harmful — Part III
Implementing and verifying safeMergeArray
In the first part of the series, we described the possible crash when merging RxJava streams that run in parallel threads, while in the second part we wrote the tests and the documentation of our custom solution — safeMergeArray()
, which will try to merge the streams safely without the risk of crashing the app. In this final post, we’ll work on the actual implementation of our function and confirm that the tests will pass.
Requirements of safeMergeArray
As we already explained in our series, our function will need to serve as a replacement of Completable.mergeArray()
which merges one or more Completables into one.
A Completable is a type of Observable that doesn’t emit any value but instead it emits either a completion or an error event. It’s used when we don’t care about the value that an operation returns — we only care if the operation succeeded or failed.
That means that our function will need to run the Completables in the specified scheduler and return a complete event if all streams complete successfully, or an error event if at least one of them signals an error. And of course it will need to prevent the crash that Completable.mergeArray()
can cause when the streams run in parallel threads and they emit an error simultaneously.
How do we prevent the crash when two streams emit an error simultaneously?
When merging Completables, the first error that is emitted normally terminates the downstream. In this case, we do not want to terminate it on the first error — we want instead to collect the result of each stream and then build our own logic for how to react. This will prevent the downstream from being disposed when multiple streams signal an error at the same time, which is the cause of the crash.
In order to collect the result of each stream, we will need to transform our Completables into Singles that emit Result
objects. A Single is a type of Observable that can emit one value only or signal an error.
Implementing safeMergeArray
1st step: Transform the Completables into Singles
Result
is a custom sealed class, used for the return type of our Singles, that will represent the event emitted by the respective Completable:
sealed class Result {
object Complete : Result()
class Error(val throwable: Throwable) : Result()
}
Converting the Completables into Singles can be done using the toSingleDefault()
function:
val singles: List<Single<Result>> = completables.map {
it.toSingleDefault<Result>(Result.Complete)
.onErrorReturn { throwable ->
Result.Error(throwable)
}
}
The code above takes a number of Completables and transforms them into a list of Singles. If a Completable
completes successfully, the respective Single
will return a Result.Complete
while if it emits an error, we will catch it and the respective Single
will return a Result.Error(throwable)
. That way, we will be able to identify later whether a Completable succeeded or emitted an error.
2nd step: Zip the Singles
Now that we have the list of Singles, we’ll need to merge them into one using the Single.zip()
operator:
val result = Single.zip(singles) { results: Array<Any> ->
results.firstOrNull { it is Result.Error } ?: Result.Complete
}
The code above will wait for our streams to execute and will return a Single
with the result of the operation. If at least one of the streams emits an error, the merged Single
will return a Result.Error
while if all streams complete successfully, it will return a Result.Complete
.
Last step: Transform it back to a Completable
Since we’re working on a replacement of Completable.mergeArray()
we still need to return a Completable
. That would mean transforming the “zipped” Single<Result>
into a Completable
as the last step:
return result.flatMapCompletable {
if (it is Result.Error) {
Completable.error(it.throwable)
} else {
Completable.complete()
}
}
Et voilà! We managed to merge our original Completables, and we signal a complete event if all of them completed successfully or an error event if at least one of them emitted an error.
If we put all the pieces together, we have our custom implementation of merging Completables — a function called safeMergeArray()
that takes a number of Completables and merges them into one. It serves as a direct replacement of Completable.mergeArray()
:
Verifying the behavior of safeMergeArray
Now that we wrote the implementation of our safeMergeArray()
function, we are ready to run our tests again and hope that they’ll pass.
Hooray! 🎉 Every test is passing, so that means that we can now start using our safeMergeArray()
function in production, replacing all existing calls of Completable.mergeArray()
.
Our complete test suite can be found in GitHub.
Bonus tip: As you may have noticed, there is some duplicate logic in our tests with Completables either succeeding or throwing an error. This hints that our test suite can be transformed into a parameterized test, but we will leave that refactoring to you so that we won’t make our blog post series even lengthier.
Is there a catch to our solution?
We wouldn’t call it a catch but there is an implementation detail that you need to be aware of. As you noticed, when transforming the Completables into Singles, we catch every exception and we map them to a Result.Error
using the onErrorReturn()
operator. That means that if one of our streams signals an error, the rest of the streams will continue the execution until they complete or signal an error themselves, before we notify the downstream. In contrast, Completable.mergeArray()
will stop the execution of all streams if at least one of them signals an error.
Another thing worth noting, is that in case multiple streams signal an error, our merged Completable
will signal the first error it finds in the results list, which won’t be necessarily the error that was thrown first during the execution of the streams. That’s because we filter the results of the Single.zip()
operation with firstOrNull
. If we want our merged Completable
to emit the error that was thrown first, we can simply add an additional val time: Long
parameter in our Result.Error
class, store the timestamp when a stream signals an error, and in the results of the Single.zip()
operation, return the error that was thrown first.
Conclusion
In this blog post series, we defined a clear reproducible problem when merging RxJava Observables, we followed a TDD approach to test and document or custom solution that prevents the issue, and lastly we worked on the implementation.
It’s worth mentioning that this crash was bothering us for years and we are so excited that we finally understood the underlying issue and managed to implement a workaround. As most developers working with RxJava know, figuring out RxJava stack traces is not something straightforward especially when dealing with UndeliverableException
s. On the contrary, they will require a rather deep investigation.
Libraries such as RxDogTag
, that can surface in the stack traces the observers that do not implement onError()
, are definitely helpful, not though in the crash that we described in our series where onError()
was in fact implemented, but the downstream was disposed.
Lastly, we need to point out that the same issue affects other RxJava combine operators as well, such as the Single.zip()
operator. Following the same logic with safeMergeArray()
, we can create replacement functions for those as well.
More in this series
- Part I: The hidden cause of UndeliverableExceptions in RxJava
- Part II: How to merge RxJava streams safely following TDD
- Part III: Implementing and verifying safeMergeArray ← you are here
About the author
Stelios Frantzeskakis is a Senior Software Engineer for Perry Street Software, publisher of the LGBTQ+ dating apps SCRUFF and Jack’d, with more than 20M members worldwide.