Merging RxJava Observables considered harmful — Part III

Implementing and verifying safeMergeArray

Stelios Frantzeskakis
Perry Street Software Engineering
6 min readAug 10, 2021

--

Yes, we all enjoy seeing those green lights when testing (Photo by Blue Ox Studio on Pexels)

In the first part of the series, we described the possible crash when merging RxJava streams that run in parallel threads, while in the second part we wrote the tests and the documentation of our custom solution — safeMergeArray(), which will try to merge the streams safely without the risk of crashing the app. In this final post, we’ll work on the actual implementation of our function and confirm that the tests will pass.

Requirements of safeMergeArray

As we already explained in our series, our function will need to serve as a replacement of Completable.mergeArray() which merges one or more Completables into one.

A Completable is a type of Observable that doesn’t emit any value but instead it emits either a completion or an error event. It’s used when we don’t care about the value that an operation returns — we only care if the operation succeeded or failed.

That means that our function will need to run the Completables in the specified scheduler and return a complete event if all streams complete successfully, or an error event if at least one of them signals an error. And of course it will need to prevent the crash that Completable.mergeArray() can cause when the streams run in parallel threads and they emit an error simultaneously.

How do we prevent the crash when two streams emit an error simultaneously?

When merging Completables, the first error that is emitted normally terminates the downstream. In this case, we do not want to terminate it on the first error — we want instead to collect the result of each stream and then build our own logic for how to react. This will prevent the downstream from being disposed when multiple streams signal an error at the same time, which is the cause of the crash.

Hope you agree with Jeniffer

In order to collect the result of each stream, we will need to transform our Completables into Singles that emit Result objects. A Single is a type of Observable that can emit one value only or signal an error.

Implementing safeMergeArray

1st step: Transform the Completables into Singles

Result is a custom sealed class, used for the return type of our Singles, that will represent the event emitted by the respective Completable:

Converting the Completables into Singles can be done using the toSingleDefault() function:

The code above takes a number of Completables and transforms them into a list of Singles. If a Completable completes successfully, the respective Single will return a Result.Complete while if it emits an error, we will catch it and the respective Single will return a Result.Error(throwable). That way, we will be able to identify later whether a Completable succeeded or emitted an error.

2nd step: Zip the Singles

Now that we have the list of Singles, we’ll need to merge them into one using the Single.zip() operator:

The code above will wait for our streams to execute and will return a Single with the result of the operation. If at least one of the streams emits an error, the merged Single will return a Result.Error while if all streams complete successfully, it will return a Result.Complete.

Last step: Transform it back to a Completable

Since we’re working on a replacement of Completable.mergeArray() we still need to return a Completable. That would mean transforming the “zipped” Single<Result> into a Completable as the last step:

Et voilà! We managed to merge our original Completables, and we signal a complete event if all of them completed successfully or an error event if at least one of them emitted an error.

If we put all the pieces together, we have our custom implementation of merging Completables — a function called safeMergeArray() that takes a number of Completables and merges them into one. It serves as a direct replacement of Completable.mergeArray():

Verifying the behavior of safeMergeArray

Now that we wrote the implementation of our safeMergeArray() function, we are ready to run our tests again and hope that they’ll pass.

Hooray! 🎉 Every test is passing, so that means that we can now start using our safeMergeArray() function in production, replacing all existing calls of Completable.mergeArray().

Our complete test suite can be found in GitHub.

Bonus tip: As you may have noticed, there is some duplicate logic in our tests with Completables either succeeding or throwing an error. This hints that our test suite can be transformed into a parameterized test, but we will leave that refactoring to you so that we won’t make our blog post series even lengthier.

Is there a catch to our solution?

We wouldn’t call it a catch but there is an implementation detail that you need to be aware of. As you noticed, when transforming the Completables into Singles, we catch every exception and we map them to a Result.Error using the onErrorReturn() operator. That means that if one of our streams signals an error, the rest of the streams will continue the execution until they complete or signal an error themselves, before we notify the downstream. In contrast, Completable.mergeArray() will stop the execution of all streams if at least one of them signals an error.

Another thing worth noting, is that in case multiple streams signal an error, our merged Completable will signal the first error it finds in the results list, which won’t be necessarily the error that was thrown first during the execution of the streams. That’s because we filter the results of the Single.zip() operation with firstOrNull. If we want our merged Completable to emit the error that was thrown first, we can simply add an additional val time: Long parameter in our Result.Error class, store the timestamp when a stream signals an error, and in the results of the Single.zip() operation, return the error that was thrown first.

Conclusion

In this blog post series, we defined a clear reproducible problem when merging RxJava Observables, we followed a TDD approach to test and document or custom solution that prevents the issue, and lastly we worked on the implementation.

It’s worth mentioning that this crash was bothering us for years and we are so excited that we finally understood the underlying issue and managed to implement a workaround. As most developers working with RxJava know, figuring out RxJava stack traces is not something straightforward especially when dealing with UndeliverableExceptions. On the contrary, they will require a rather deep investigation.

Libraries such as RxDogTag, that can surface in the stack traces the observers that do not implement onError(), are definitely helpful, not though in the crash that we described in our series where onError() was in fact implemented, but the downstream was disposed.

Lastly, we need to point out that the same issue affects other RxJava combine operators as well, such as the Single.zip() operator. Following the same logic with safeMergeArray(), we can create replacement functions for those as well.

More in this series

About the author

Stelios Frantzeskakis is a Senior Software Engineer for Perry Street Software, publisher of the LGBTQ+ dating apps SCRUFF and Jack’d, with more than 20M members worldwide.

--

--

Perry Street Software Engineering
Perry Street Software Engineering

Published in Perry Street Software Engineering

The engineering blog of Perry Street Software, which publishes two of the world’s largest LGBTQ+ dating apps, SCRUFF and Jack’d

Stelios Frantzeskakis
Stelios Frantzeskakis

Written by Stelios Frantzeskakis

Staff Engineer @ PSS. Advocate for XP, TDD, BDD, and DDD. https://twitter.com/SteliosFran