Understanding `share` operator

Rohan Sanap
Swift India
Published in
5 min readOct 10, 2019

Introduction

The chain of operators get re-executed with each new subscription. Consider the following code:

let observable = Observable<Int>.create { observer -> Disposable in
// Simulating a very resource expensive operation
print("Creating observable...")
DispatchQueue.global(qos: .background).asyncAfter(deadline: .now() + 10, execute: {
observer.onNext(1)
observer.onCompleted()
})
return Disposables.create()
}
let subscriptionA = observable.subscribe()
let subscriptionB = observable.subscribe()

Output:

Creating observable...
Creating observable...

Here you see that Creating observable… is printed twice! That is because the code inside the observable creation closure is executed on every new subscription. In this example we performed it twice in the last two lines. This is where shareoperator comes into picture. This operator shares single subscription across all subscribers. Thus the observable won’t be recreated and the chain of operations (if any) won’t be re-executed.

let sharedSubscription = observable.share()
let subscriptionA = sharedSubscription.subscribe()
let subscriptionB = sharedSubscription.subscribe()

This is a very simple example. But, image if the code inside the closure was a network request. You would be performing the same network request twice. Consider following example:

Class SomeViewModel {
...
let result = userTappedButton.flatMapLatest { apiService.rx.getFriend() }
let name = result.map { $0.name }
let phone = result.map { $0.phone }
...
}

You have a result stream which reacts to a user’s tap on a button, and fires a network request to some API service. You map these results into two separate streams of the friend’s name and phone number. Later on, in your view controller, you will subscribe to the name and phone streams and expect to get the latest friend’s name and phone when your user taps on the button.

Class SomeViewController: UIViewController {
...
//Bind your viewModel's outputs to your UI
viewModel.name.bind(to: nameLabel.rx.text).disposed(by: bag)
viewModel.phone.bind(to: phoneLabel.rx.phone).disposed(by: bag)
...
}

In the following example, every time you subscribe to name and phone you will actually be getting an entirely different stream, meaning an entirely different network request per subscription!

share to the rescue! It lets you define streams that share resources among their subscribers. Meaning, each subscriber to the stream will get the exact same stream, and will not invoke additional computations or resources for that origin stream. In the example above, merely adding share operator after original flatMapLatest will solve the problem and only one network request will be done.

Class SomeViewModel {
...
let result = userTappedButton
.flatMapLatest { apiService.rx.getFriend() }
.share()

let name = result.map { $0.name }
let phone = result.map { $0.phone }
...
}

Going deeper

share() is abstract form of function share(replay: Int = 0, scope: SubjectLifetimeScope = .whileConnected) with it’s default values. Let’s break these two parameters for better understanding.

Replay

This is pretty self explanatory. It is count of elements that we wish to replay to a new subscriber. In vague terms, with it’s default value as 0 it makes your stream act as a PublishSubject, meaning new subscriber only get’s future values but don’t know of anything that happened before the point of subscription.

In contrast, setting replay’s value to 1 makes your stream act like a BehaviourSubject and setting it’s value to any other bigger value makes your stream act as a ReplaySubject.

Scope

This argument has two possible values, viz., .whileConnected and .forever. In most of the cases you will use .whileConnected and it will be enough for you. In very rare cases you will have to use value .forever.

This argument has two possible values, viz., .whileConnected and .forever. In most of the cases you will use .whileConnected and it will be enough for you. In very rare cases you will have to use value .forever.

Here’s how each of these works:

  • .whileConnected: Values are replayed (when replay is larger than 0) in a reference-counting manner, much like ARC. When the number of subscribers drops from 1 to 0, the internal “cache” of the shared stream is cleared. It’s safe to use operators like retry since these will get a fresh stream for each retry and have a cleared internal state.
  • .forever: The internal cache of the stream is not cleared, even after the number of subscribers drops from 1 to 0. Meaning, future subscribers could potentially get stale events from the internal cache of the shared stream. It’s not recommended using operators such as retry in this case, as the retry might “carry” stale events and cause unexpected behaviour.

Consider the following example and you will get a proper idea about difference between two

Example for .whileConnected

let observable = Observable<Int>
.interval(.seconds(1), scheduler: MainScheduler.instance)
.share(replay: 1, scope: .whileConnected)
var subscription1: Disposable = observable
.debug("Sub 1")
.subscribe()
DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
print("After t+3 seconds")
subscription1.dispose()
}
var subscription2: Disposable?
DispatchQueue.main.asyncAfter(deadline: .now() + 10) {
print("After t+10 seconds")
subscription2 = observable
.debug("Sub 2")
.subscribe()
}

Output -

2019-07-18 16:10:42.994: Sub 1 -> subscribed
2019-07-18 16:10:43.996: Sub 1 -> Event next(0)
2019-07-18 16:10:44.996: Sub 1 -> Event next(1)
2019-07-18 16:10:45.997: Sub 1 -> Event next(2)
After t+3 seconds
2019-07-18 16:10:46.388: Sub 1 -> isDisposed
After t+10 seconds
2019-07-18 16:10:53.096: Sub 2 -> subscribed
2019-07-18 16:10:54.098: Sub 2 -> Event next(0)
2019-07-18 16:10:55.097: Sub 2 -> Event next(1)
After t+13 seconds
2019-07-18 16:10:56.097: Sub 2 -> isDisposed

After t+10, when subscription2 is subscribed, it restarts the sequence and no older value is replayed on it even when we have replay number set to 1. This is because before subscription2 is subscribed, we dispose subscription1 and the causes the observable to have 0 subscribers/subscriptions connect. This is when the cache is cleared and all older values are dump.

Example for .forever

In the above example code snippet, just change .whileConnected to .forever on line number 3 and you will get following output

2019-07-18 16:11:51.165: Sub 1 -> subscribed
2019-07-18 16:11:52.168: Sub 1 -> Event next(0)
2019-07-18 16:11:53.168: Sub 1 -> Event next(1)
2019-07-18 16:11:54.168: Sub 1 -> Event next(2)
After t+3 seconds
2019-07-18 16:11:54.463: Sub 1 -> isDisposed
After t+10 seconds
2019-07-18 16:12:01.170: Sub 2 -> subscribed
2019-07-18 16:12:01.170: Sub 2 -> Event next(2)
2019-07-18 16:12:02.171: Sub 2 -> Event next(0)
2019-07-18 16:12:03.171: Sub 2 -> Event next(1)
After t+13 seconds
2019-07-18 16:12:04.170: Sub 2 -> isDisposed

If we observe the difference between this output and the previous output, we see that after subscription of subscription2, value 2 is replayed on it from the previous stream as the cache is not cleared because the scope is .forever.

References

For a more holistic and deeper understanding of Connectable Observable Sequences in RxSwift with operators such as publish, replay, and refCount, please refer to these articles and playground:

--

--