Learn & Master ⚔️ the Basics of RxSwift in 10 Minutes

Every programmer should have heard of Rx. Whether it happened at the last developer conference or while reading a fresh blog article like this one 😎. It is almost impossible not to have heard of it, but what exactly is Reactive Programming? Let’s take a look on the Internet:

In computing, reactive programming is a programming paradigm oriented around data flows and the propagation of change. This means that it should be possible to express static or dynamic data flows with ease in the programming languages used, and that the underlying execution model will automatically propagate changes through the data flow. — Wikipedia

I am honest, after reading this paragraph, most people will have no idea what reactive programming actually is, so do I. This is the reason why I will try to create a simple, easy to understand Introduction for this modern approach to software development using the Swift Version of Rx: RxSwift.


1. Observable Sequences 🎞

The first thing you need to understand is that everything in RxSwift is an observable sequence or something that operates on or subscribes to events emitted by an observable sequence.

Arrays, Strings or Dictionaries will be converted to observable sequences in RxSwift. You can create an observable sequence of any Object that conforms to the Sequence Protocol from the Swift Standard Library.

Let’s create some observable sequences:

let helloSequence = Observable.just("Hello Rx")
let fibonacciSequence = Observable.from([0,1,1,2,3,5,8])
let dictSequence = Observable.from([1:"Hello",2:"World"])

You subscribe to observable sequences by calling 
subscribe(on:(Event<T>)-> ())
The passed block will receive all events emitted by that sequence.

let helloSequence = Observable.of("Hello Rx")
let subscription = helloSequence.subscribe { event in
print(event)
}
OUTPUT: 
next("Hello Rx")
completed

Observable sequences can emit zero or more events over their lifetimes. 
In RxSwift an Event is just an Enumeration Type with 3 possible states:

  • .next(value: T) — When a value or collection of values is added to an observable sequence it will send the next event to its subscribers as seen above. The associated value will contain the actual value from the sequence.
  • .error(error: Error) — If an Error is encountered, a sequence will emit an error event. This will also terminate the sequence.
  • .completed — If a sequence ends normally it sends a completed event to its subscribers
let helloSequence = Observable.from(["H","e","l","l","o"])
let subscription = helloSequence.subscribe { event in
switch event {
case .next(let value):
print(value)
case .error(let error):
print(error)
case .completed:
print("completed")
}
}
OUTPUT:
H e l l o
completed

If you want to cancel a subscription you can do that by calling dispose on it. You can also add the subscription to a Disposebag which will cancel the subscription for you automatically on deinit of the DisposeBag Instance. Another thing you can do is to subscribe just to a specific Event. For Example, if just want to receive the error events emitted by a sequence, you can use: subscribe(onError:(Error->())).

This Code snippet will aggregate all the things you learned by now:

// Creating a DisposeBag so subscribtion will be cancelled correctly
let bag = DisposeBag()
// Creating an Observable Sequence that emits a String value
let observable = Observable.just("Hello Rx!")
// Creating a subscription just for next events
let subscription = observable.subscribe (onNext:{
print($0)
})
// Adding the Subscription to a Dispose Bag
subscription.addDisposableTo(bag)

2. Subjects 📫

A Subject is a special form of an Observable Sequence, you can subscribe and dynamically add elements to it. There are currently 4 different kinds of Subjects in RxSwift

  • PublishSubject: If you subscribe to it you will get all the events that will happen after you subscribed.
  • BehaviourSubject: A behavior subject will give any subscriber the most recent element and everything that is emitted by that sequence after the subscription happened.
  • ReplaySubject: If you want to replay more than the most recent element to new subscribers on the initial subscription you need to use a ReplaySubject. With a ReplaySubject, you can define how many recent items you want to emit to new subscribers.
  • Variable: A Variable is just a BehaviourSubject wrapper that feels more natural to a none reactive programmers. It can be used like a normal Variable.

I’ll just show you how the PublishSubject works in this Article. If you want to know more about the other subject types, you need to take a look at the accompanying material on GitHub. They basically differ only in the number of past events emitted and received by their subscribers on initial subscription.

Publish: 0
Behaviour & Variable: 1
Replay: N

Let’s take a look at the PublishSubject.

The first thing we need to do is to create an actual PublishSubject instance. This is super easy, we can use the default initializer for that.

let bag = DisposeBag()
var publishSubject = PublishSubject<String>()

You can add new Values to that sequence by using the onNext() function. onCompleted() will complete the sequence and onError(error) will result in emitting an error event. Let’s add some values to our PublishSubject.

publishSubject.onNext("Hello")
publishSubject.onNext("World")

If you subscribe to that subject after adding “Hello” and “World” using onNext(), you won’t receive these two values through events. In contrast to a BehaviourSubject, that will receive “World”, which is the most recent event.

Now let’s create a subscription and add some new values to the Subject. 
We also create a second subscription and add even more values to it. 
Please read the comments to understand what actually is going on.

let subscription1 = publishSubject.subscribe(onNext:{
print($0)
}).addDisposableTo(bag)
// Subscription1 receives these 2 events, Subscription2 won't
publishSubject.onNext("Hello")
publishSubject.onNext("Again")
// Sub2 will not get "Hello" and "Again" because it susbcribed later
let subscription2 = publishSubject.subscribe(onNext:{
print(#line,$0)
})
publishSubject.onNext("Both Subscriptions receive this message")

Congratulations 🎉. If you kept up reading to this point you should know the basics of RxSwift. There is a lot more to learn, but everything around Rx is based on these simple principles. You can take a short break now and play around with these concepts to fully understand them. If you are ready let us continue because there is a lot more interesting stuff to uncover.

3. Marble Diagrams 🙌🏼

If you work with RxSwift or Rx in general, you should get to know Marble Diagrams. A Marble Diagram visualizes the transformation of an observable sequence. It consists of the input stream on top, the output stream at the bottom and the actual transformation function in the middle.

For Instance, let’s take look at an operation that delays your emitted events from an observable sequence by 150 milliseconds. Please ignore the scheduler parameter because I will introduce it later in the article:

Easy to understand, right?

There are great open source projects for both iOS and Android which allows you to interactively play around with these diagrams on your mobile devices. Play around with them and I promise you that you will learn a lot about Rx in a short amount of time.

Web-App: http://rxmarbles.com
iOS-App: https://itunes.apple.com/com/app/rxmarbles/id1087272442
Android: https://goo.gl/b5YD8K

4. Transformations ⚙️

Sometimes you want to transform, combine or filter the elements emitted by an observable sequence before the subscriber receives them. I will introduce you to the basic transformation operators, tell you something about Filters and possibilities to combine sequences. At last I will show you how to perform transformations, combinations etc. on different threads.
Let’s get started.

4.1 Map

To transform Elements emitted from an observable Sequence, before they reach their subscribers, you use the map operator. Imagine a transformation that multiplies each value of a sequence with 10 before emitting.

Observable<Int>.of(1,2,3,4).map { value in 
return value * 10
}.subscribe(onNext:{
print($0)
})
OUTPUT: 10 20 30 40

4.2 FlatMap

Imagine an Observable Sequence that consists of objects that are themselves Observables and you want to create a new Sequence from those. This is where FlatMap comes into play. FlatMap merges the emission of these resulting Observables and emitting these merged results as its own sequence.

let sequence1  = Observable<Int>.of(1,2)
let sequence2 = Observable<Int>.of(1,2)
let sequenceOfSequences = Observable.of(sequence1,sequence2)
sequenceOfSequences.flatMap{ return $0 }.subscribe(onNext:{
print($0)
})
OUTPUT: 1 2 1 2

4.3 Scan

Scan starts with an initial seed value and is used to aggregate values just like reduce in Swift.

Observable.of(1,2,3,4,5).scan(0) { seed, value in
return seed + value
}.subscribe(onNext:{
print($0)
})
OUTPUT: 1 3 6 10 15

4.4 Buffer

The Buffer operator transforms an Observable that emits items into an Observable that emits buffered collections of those items.

SequenceThatEmitsWithDifferentIntervals
.buffer(timeSpan: 150, count: 3, scheduler:s)
.subscribe(onNext:{
print($0)
})
OUTPUT: [1] [2,3] [4] [5,6] [7] [] 

5.Filter 🚬

If you only want to react on next events based on certain criteria you should use a filter operator.

5.1 Filter

The Basic filter Operation works similar to the swift equivalent. You just define a condition that needs to be passed and if the condition is fulfilled a .next event will be emitted to its subscribers.

Observable.of(2,30,22,5,60,1).filter{$0 > 10}.subscribe(onNext:{
print($0)
})
OUTPUT: 30 22 60

5.2 DistinctUntilChanged

If you just want to emit next Events if the value changed from previous ones you need to use distinctUntilChanged.

Observable.of(1,2,2,1,3).distinctUntilChanged().subscribe(onNext:{
print($0)
})
OUTPUT: 1 2 1 3

Other filter operators you should try:

  • Debounce
  • TakeDuration
  • Skip

6. Combine 💑

Combining sequences is a common Task. RxSwift provides a lot of operators for you. Here are 3 of them:

6.1 StartWith

If you want an Observable to emit a specific sequence of items before it begins emitting the items normally expected from it, use the startWith operator.

Observable.of(2,3).startWith(1).subscribe(onNext:{
print($0)
})
OUTPUT: 1 2 3

6.2 Merge

You can combine the output of multiple Observables so that they act like a single Observable, by using the Merge operator.

let publish1 = PublishSubject<Int>()
let publish2 = PublishSubject<Int>()
Observable.of(publish1,publish2).merge().subscribe(onNext:{
print($0)
})
publish1.onNext(20)
publish1.onNext(40)
publish1.onNext(60)
publish2.onNext(1)
publish1.onNext(80)
publish2.onNext(2)
publish1.onNext(100)
OUTPUT: 20 40 60 1 80 2 100

6.3 Zip

You use the Zip method if you want to merge items emitted by different observable sequences to one observable sequence. Zip will operate in strict sequence, so the first two elements emitted by Zip will be the first element of the first sequence and the first element of the second sequence combined. Keep also in Mind that Zip will only emit as many items as the number of items emitted of the source Observables that emits the fewest items.

let a = Observable.of(1,2,3,4,5)
let b = Observable.of("a","b","c","d")
Observable.zip(a,b){ return ($0,$1) }.subscribe {
print($0)
}
OUTPUT: (1, "a")(2, "b") (3, "c") (4, "d")

Other combination filters you should try:

  • Concat
  • CombineLatest
  • SwitchLatests

7. Side Effects 👉

If you want to register callbacks that will be executed when certain events take place on an Observable Sequence you need to use the doOn Operator. It will not modify the emitted elements but rather just pass them through. 
You can use …

  • do(onNext:) - if you want to do something just if a next event happened
  • do(onError:) - if errors will be emitted and
  • do(onCompleted:) - if the sequence finished successfully.
Observable.of(1,2,3,4,5).do(onNext: {
$0 * 10 // This has no effect on the actual subscription
}).subscribe(onNext:{
print($0)
})

8.Schedulers ⏰

Operators will work on the same thread as where the subscription is created. In RxSwift you use schedulers to force operators do their work on a specific queue. You can also force that the subscription should happen on a specifc Queue. You use subscribeOn and observeOn for those tasks. If you are familiar with the concept of operation-queues or dispatch-queues this should be nothing special for you. A scheduler can be serial or concurrent similar to GCD or OperationQueue. There are 5 Types of Schedulers in RxSwift:

  • MainScheduler — “Abstracts work that needs to be performed on MainThread. In case schedule methods are called from the main thread, it will perform the action immediately without scheduling.This scheduler is usually used to perform UI work.”
  • CurrentThreadScheduler — “Schedules units of work on the current thread. This is the default scheduler for operators that generate elements.”
  • SerialDispatchQueueScheduler — “Abstracts the work that needs to be performed on a specific dispatch_queue_t. It will make sure that even if a concurrent dispatch queue is passed, it's transformed into a serial one.Serial schedulers enable certain optimizations for observeOn.The main scheduler is an instance of SerialDispatchQueueScheduler"
  • ConcurrentDispatchQueueScheduler — “Abstracts the work that needs to be performed on a specific dispatch_queue_t. You can also pass a serial dispatch queue, it shouldn't cause any problems. This scheduler is suitable when some work needs to be performed in the background.”
  • OperationQueueScheduler — “Abstracts the work that needs to be performed on a specific NSOperationQueue. This scheduler is suitable for cases when there is some bigger chunk of work that needs to be performed in the background and you want to fine tune concurrent processing using maxConcurrentOperationCount.”

Here is a code snippet that shows you how to observe something concurrently on a background queue und subscribe on the main-queue.

let publish1 = PublishSubject<Int>()
let publish2 = PublishSubject<Int>()
let concurrentScheduler = ConcurrentDispatchQueueScheduler(qos: .background)
Observable.of(publish1,publish2)
.observeOn(concurrentScheduler)
.merge()
.subscribeOn(MainScheduler())
.subscribe(onNext:{
print($0)
})
publish1.onNext(20)
publish1.onNext(40)
OUTPUT: 20 40 

It’s a wrap 🎁

Congratulation, you learned the basics of RxSwift. Happy Coding 🎉

Learn & Master RxCocoa will be coming soon …


Show your support

Clapping shows how much you appreciated Sebastian Boldt 👨🏻‍💻’s story.