RxSwift Deep Dive Part 1: Observable.create & Observable.subscribe
RxSwift is undoubtedly one of the most commonly used library for data flow in iOS where it provides a different way of tackling the asynchronous programming. Me myself have used RxSwift for quite a long time as a backbone of the architecture. Lots of other framework that I used is based on the RxSwift too. However, I only used RxSwift as a user, like using the Observable
, PublishSubject
, and some of the Rx operator without truly understanding it.
The problem starts when I got a bug and I don’t understand what I am looking for in the stack trace 😥
That was the reason I want to know RxSwift code deeper. At first, I watched objc.io videos about reactive programming to get the basic concept. After that, I decided what part of RxSwift should I learn first, and I chose Observable.create
& Observable.subscribe
. Then I copied the implementation of those function and try to compile it. Every time it failed, I copy other class/struct that missing until it compiles successfully.
Let’s check the code ofObservable.create
first.
We get AnyObserver
, Disposable
, and AnonymousObservable
.
AnyObserver
is a typed erased ofObserverType
. For me it’s responsibility is to forwards event (next, error, complete).
Disposable
is just reference to the subscription that you made.AnonymousObservable
is a private class (not visible to user) that based on theProducer
(Producer
is based onObservable
class which conform toObservableType
protocol). As the prefix suggest,Anonymous
is term that is used by RxSwift to indicate that the class is only a shell that in this case hold(AnyObserver<Element>) -> Disposable
.
The second part is the longer one, which is Observable.subscribe
.
intObservable.subscribe(MyObserver()).disposed(by: disposeBag)
The basic implementation of subscribe is accepting ObserverType
as the only parameter. So I created MyObserver
class that conform to ObserverType. the only method that need to be implemented is func on(_ event: Event<Element>)
. This is the function that called whenever the Observer gets a new Event
.
It’s quite difficult if we need to create/reuse a class every time we subscribe to an observable. But no need to worry, RxSwift has created 2 syntactic sugar to help us create easier subscription.
The first syntactic sugar is accepting closure of event so you don’t need to create a class.
func subscribe(_ on: @escaping (Event<Element>) -> Void) -> Disposable
And here is how you use it.
The second syntactic sugar is created to avoid doing switch or if you only need to listen to specific type of the event.
Enough for tutorial part, let’s dive into the implementation of subscribe
method.
AnonymousObservable
is using the default implementation of subscribe from its base class which is Producer
.
This part is quite complicated for me to understand. Firstly, the Producer
creates aSinkDisposer
, which passes to run method of the AnonymousObservableSink
.
After that, AnonymousObservableSink
that returned is hold by the SinkDisposer
in disposer.setSinkAndSubscription
method and create a reference cycle between SinkDisposer
and the AnonymousObservableSink
that can be show in graph below. This reference cycle is what keeps the subscription alive.
Finally, the SinkDisposer
is being returned as Disposable
that you usually insert it into the DisposeBag
via disposed(by:)
.
When you call disposed on the SinkDisposer
or deallocate the DisposeBag that store the SinkDisposer
, it will remove the Sink (AnonymousObservableSink
) from itself, breaking the reference cycle, and remove both of them from the memory.
That’s it for part 1, see you next time for the part 2.
Thanks Ferico Samuel and Arya Surya for reviewing the draft 🙏🏻