RxSwift Deep Dive Part 1: Observable.create & Observable.subscribe

Jefferson Setiawan
4 min readDec 1, 2021
RxSwift Logo

RxSwift is undoubtedly one of the most commonly used library for data flow in iOS where it provides a different way of tackling the asynchronous programming. Me myself have used RxSwift for quite a long time as a backbone of the architecture. Lots of other framework that I used is based on the RxSwift too. However, I only used RxSwift as a user, like using the Observable, PublishSubject, and some of the Rx operator without truly understanding it.

The problem starts when I got a bug and I don’t understand what I am looking for in the stack trace 😥

RxSwift Stacktrace
RxSwift stack trace with lot of chaining operator τ_0_0

That was the reason I want to know RxSwift code deeper. At first, I watched objc.io videos about reactive programming to get the basic concept. After that, I decided what part of RxSwift should I learn first, and I chose Observable.create & Observable.subscribe. Then I copied the implementation of those function and try to compile it. Every time it failed, I copy other class/struct that missing until it compiles successfully.

Let’s check the code ofObservable.create first.

We get AnyObserver, Disposable, and AnonymousObservable.

  • AnyObserver is a typed erased of ObserverType. For me it’s responsibility is to forwards event (next, error, complete).
  • Disposable is just reference to the subscription that you made.
  • AnonymousObservable is a private class (not visible to user) that based on the Producer (Producer is based on Observable class which conform to ObservableType protocol). As the prefix suggest, Anonymous is term that is used by RxSwift to indicate that the class is only a shell that in this case hold (AnyObserver<Element>) -> Disposable.
Diagram for Observable.create
Diagram for Observable.create

The second part is the longer one, which is Observable.subscribe .

intObservable.subscribe(MyObserver()).disposed(by: disposeBag)

The basic implementation of subscribe is accepting ObserverType as the only parameter. So I created MyObserver class that conform to ObserverType. the only method that need to be implemented is func on(_ event: Event<Element>). This is the function that called whenever the Observer gets a new Event.

It’s quite difficult if we need to create/reuse a class every time we subscribe to an observable. But no need to worry, RxSwift has created 2 syntactic sugar to help us create easier subscription.

The first syntactic sugar is accepting closure of event so you don’t need to create a class.

func subscribe(_ on: @escaping (Event<Element>) -> Void) -> Disposable

And here is how you use it.

The second syntactic sugar is created to avoid doing switch or if you only need to listen to specific type of the event.

Enough for tutorial part, let’s dive into the implementation of subscribe method.

subscribe flow

AnonymousObservable is using the default implementation of subscribe from its base class which is Producer.

This part is quite complicated for me to understand. Firstly, the Producer creates aSinkDisposer, which passes to run method of the AnonymousObservableSink.

After that, AnonymousObservableSink that returned is hold by the SinkDisposer in disposer.setSinkAndSubscription method and create a reference cycle between SinkDisposer and the AnonymousObservableSink that can be show in graph below. This reference cycle is what keeps the subscription alive.

The subscription object graph

Finally, the SinkDisposer is being returned as Disposable that you usually insert it into the DisposeBag via disposed(by:).

When you call disposed on the SinkDisposer or deallocate the DisposeBag that store the SinkDisposer, it will remove the Sink (AnonymousObservableSink) from itself, breaking the reference cycle, and remove both of them from the memory.

That’s it for part 1, see you next time for the part 2.

Thanks Ferico Samuel and Arya Surya for reviewing the draft 🙏🏻

--

--