How to use RxJava share() operator?
Use Case: Let’s assume, you have an Observable stream from a database for your email inbox which populates your Inbox itself, and also have a count on the top which indicates your unread emails.
Problem: You can’t simply do…
srcObservable.subscribe(::populateList)srcObservable.filter(::isUnread).subscribe(::showUnreadCount)
Because…
There is no “First subscriber: String2” in the output.
So this means, you cannot have multiple subscribers for the same Observable.
The first subscriber gets killed as soon as the second subscriber is registered.
But with share()…
Hence now you can use:
Observable<String> shared = srcObservable.share()shared.subscribe(::populateList)shared.filter(::isUnread).subscribe(::showUnreadCount)
Trivia
share() is a shorthand for publish().refcount(), making share() an RxJava Subject.
And refcount() will kill off the Observable as soon as all observers unsubscribe (marble diagram above). So this property is carried over to share() operator too.
Edit: Another possible solution
You could create a new observable everytime, like so:srcObservable()
that provides separate streams. Having separate streams could potentially be duplication of an expensive operation, especially in database. So, the benefit of using share()
is that it’s guaranteed to have synchronicity in your UI. That is, your unread count and what you see in the list will be synced correctly. Also, it’s guaranteed to be cheap too. Happy coding!
Twitter — twitter.com/rakshakhegde
Please hit the clap button if you liked this article :)