Understanding Connectable Observable Sequences in RxSwift
This is the first of two posts on sharing subscriptions in RxSwift, to help developers learn how to use replay and share operators with RxSwift’s playground examples
No matter if you’re a rookie or a wizard in RxSwift, you will eventually forget (or find out) that the chain of operators gets re-executed with each new subscription. Consider the following code:
If you run the snippet above, you will see that Creating observable...
is printed twice! That’s because the code inside the closure passed to the create method gets executed as many times as we subscribe. In the example, I performed two subscriptions in the last couple of lines.
This is a contrived example, but imagine that the operation inside the create closure is a network request. You would be performing the same request multiple times! This is where the share operator goes into action. We apply it for a single subscription to be shared across all subscribers. Thus, the observable won’t be recreated and the chain of operators (if any) won’t be re-executed.
Seems pretty easy, right? It is indeed, but things get complicated as we start to use share, replay, share with replay, share with scope, etc. Suddenly, you find yourself in complete chaos where you’re totally confused about how to use all of these operators properly.
To help overcome the confusion, I’ll explain how these operators work and how they should be used. The code I’m sharing is mostly an adaptation of the code found in the Connectable Operators playground within the RxSwift repo.
This first post focuses on the auxiliary functions used to perform the analyses and explains the publish
, replay
and refCount
operators. Those are fundamental for understanding the share
operator, which is the focus of the second post.
These posts are written for developers with at least a basic understanding of the Swift 5.0 language and the RxSwift 5.0 library. Let’s get started!
Explaining the structure of code examples
Before going too far, I need first to explain how the code examples are structured:
The interval
operator is used to create intSequence
, an infinite stream of Ints that starts from 0 and where each new value is emitted after 1 second. Since I will perform a time-based analysis, it’s important to keep in mind the fact that the intSequence
takes 1 second to emit the first value.
And because many subscriptions to intSequence
are required, I defined the printNext(:_)
function within an extension of ObservableType
to keep the code concise. It subscribes to an observable and prints the emitted values prefixed by a tag it receives as a parameter.
RxSwift’s playground also brings the delay(_:closure)
function, which executes the given closure after a certain amount of time. Since this function and intSequence
both work with time, the order in which the closure is executed and values are emitted may vary. So, I’ve just added an offset to that function to keep the output deterministic. My changes force the closure to execute before values are emitted by intSequence. Now, there’s a delay(_:offset:closure)
function that is depicted in Listing 5.
To help visualize the output, I also added the printSecondBoundary()
function, which groups the logs that happen between one second and the next. To build that function, I used another interval
operator delayed by 0.1 seconds so that it’s executed after intSequence
emits values. Listing 6 shows the definition of that function and Output 1 presents example logs obtained with its usage.
To make things crystal clear, let’s briefly inspect the behavior of these auxiliary functions together. intSequence
emits values at one-second intervals (e.g. 1.0, 2.0, etc.), while the delay function dispatches code to execute just before (e.g. 0.9, 1.9, etc.) and printSecondBoundary()
closes the boundary right after (e.g. 1.1, 2.1, etc.) each interval. Output 2 depicts this behavior.
In this series, the subscriptions mostly take place inside a delay function. Therefore, keep in mind that a subscription delayed by T
actually happens slightly before T
.
Connectable Observable sequences
After knowing all about the auxiliary functions, there’s still one topic we still need to cover before having fun with share
: Connectable Observable sequences. As defined by the RxSwift community:
Connectable Observable sequences resemble ordinary Observable sequences, except that they do not begin emitting elements when subscribed to, but instead, only when their connect() method is called. In this way, you can wait for all intended subscribers to subscribe to a connectable Observable sequence before it begins emitting elements.
Conceptually, that’s pretty self-explanatory. But let’s see what that really looks like in practice with some examples using publish
, replay
, and refCount
operators.
Creating Connectable Observable sequences
The publish operator wraps a standard Observable
into a ConnectableObservable
. As per the definition, this wrapper is an Observable
that starts emitting values after its connect()
method is called. That behavior is shown in Listing 7 and Output 3 below:
The first thing to notice here is that, despite A
subscribing at t0
(before any delay), no values are printed until t3
. That’s because values are emitted only after connecting, which happens at t2
.
Second, notice that when B
subscribes at t4
it emits the same value of A
right away. What happens here is that the publish
operator turns the original sequence into a single shared sequence (similar to the share operator we’ll see in Part 2), which started after connecting at t2
. Also, remember that discussion about the delay function? B
subscribes slightly before the next value (1) is emitted, causing it to print at t4
.
There can only be 0 or 1 — and no more than that — shared subscriptions at any given time. Two or more subscribers actually share a single subscription.
Also, did you notice that connect()
has a return value I just discarded? Connecting a ConnectableObservable
actually means subscribing to the underlying Observable
it wraps around. Thus, that return value is just a Disposable
, which we can use for disconnecting (disposing of the underlying subscription). Let’s play with that:
In Listing 8, I am using a debug operator instead of the printNext(:_)
function. That is for verifying the behavior of the underlying subscription, which I called S
. Notice that when connect()
is called, S
is subscribed to. After that, all values emitted to S
are passed along to A
(which subscribes to the wrapping ConnectableObservable
).
Moreover, when S
is disposed at t4
, both S
and A
stop printing values. Notice, however, that A
is not disposed. Also, when S
is resubscribed to at t6
, values restart emitting to it at t7
. That doesn’t happen for A
. You should pay extra attention to this or you will wind up with a subscription in limbo.
Buffering elements in a Connectable Observable sequence
The replay
operator is very similar to publish
. The difference is that it uses a buffer for replaying the last emitted elements. Let’s look at an example followed by its output:
Note that both the code example and the output are very similar to the ones for the publish
operator (Listing 7 and Output 3). The only point of difference is in t4
, where there is an additional 0 being printed for B
. The explanation is that the replay operator buffered the last sent element, 0, and re-emitted it upon B
's subscription. After that, A
and B
print the same values, since they’re sharing the same single subscription.
There is much more to understand about the replay
operator. For example, it is sometimes confused with the share(replay:scope:)
operator, but I’ll save that disambiguation and other details for the second part of this series.
Tracking subscriptions in Connectable Observable sequences
There is also an operator called refCount
, which performs the opposite operation of publish
. It turns a ConnectableObservable
into a standard Observable
. It also tracks the number of subscribers to decide whether to connect or disconnect the underlying connectable observable. The rule is straightforward:
If the number of subscribers changes from 0 to 1, it connects. If that number changes from 1 to 0, it disconnects. For any other cases, it just delivers a shared subscription.
Let’s take a look at Listing 10 and Output 6 to better understand this rule:
Here we have three subscriptions. First, A
subscribes at t0
. At this point, refCount
's internal counter goes from 0 to 1, causing the connection of the underlying connectable observable and the actual creation of the sequence. A
then starts printing values at t1
.
At t2
, B
subscribes. Now the counter goes from 1 to 2, which means that the previously created (shared) subscription is delivered. A
and B
are now printing the same values.
By t6
, both A
and B
are disposed, causing the counter to reach 0 again. Thus, the underlying connectable observable is disconnected. Notice that when C
subscribes at t8
, it starts printing from 0 since the connectable observable had to be reconnected and the sequence recreated.
Next up: the share operator
In this blog post, I explored Connectable Observable sequences and explained in detail the behavior of the publish
, replay
and refCount
operators by performing time-based analysis on RxSwift’s playground adapted code. There’s also another connectable operator called multicast
that I won’t cover in this series because it is not commonly used (and it is less important in understanding shared subscriptions).
In the second post, we will use the same connectable operators and auxiliary functions to look in-depth at the share
operator. I’ll explain its key features and how it differs from the replay
operator.
Originally published at arctouch.com.