Fundamentals of RxJava with Kotlin for absolute beginners

Gabriel Leon de Mattos
16 min readMar 31, 2019

--

Reactive programming provides a solid foundation towards a scalable application, and today I will give you an introduction on how to use RxJava with Kotlin.

What’s RxJava?

RxJava is a reactive programming library for composing asynchronous and event-based programs by using observable sequences.

Reactive programming is based on data streams and the propagation of change. With reactive programming, you can express static (such as arrays), or dynamic (such as event emitters) data streams with ease.

Explain like I’m Five, What’s RxJava?

Let’s say Donna is a McDonald’s cashier and the manager thinks she is either stealing some cash or she gave out the wrong change. So, he asked Josh to keep an eye on Donna and report back to him on everything she does.

Josh watched closely that day as Donna gave out the wrong change twice and dropped some change by mistake since she has a hand twitching condition.

Josh, being the good employee that he is, immediately reported to his manager as the events took place.

In this situation, Josh is the Observer and Donna is the data. Josh was told to watch and report Donna as her state changes, and he’s to make a callback to whoever is listening to him (the manager).

Let’s transform this little story on some code.

Now, where can you use RxJava?

There is a multitude of places you can use RxJava, and below are the most common places where you can implement it:

  • Network Calls (such as API calls through HTTP with Retrofit, which fully supports RxJava);
  • UI events that should trigger actions;
  • Database Read and Write and/or files in the system;
  • Data coming out of the sensors;
  • Etc

Now let’s take a look at some of the main elements of RxJava.

Observable, Operator, Observer

Source: Understanding Types Of Observables In RxJava — Mindorks

An Observable is where the data stream comes from, it does some work and emits values.

An Operator has the capability to modify the data from one form to another.

An Observer receives the values.

Think of it this way, Observable is the Speaker, Operator is the Translator, and the Observer is the Listener.

Let’s fire up some examples, but first things first.

Adding dependency

Open the app level Build.gradle and add the following:

dependencies {
...
implementation "io.reactivex.rxjava2:rxjava:2.2.7"
implementation "io.reactivex.rxjava2:rxandroid:2.1.1"
}

Let’s create an Observable

There are many ways to do so, and we’ll list some of them below. The examples may get somewhat complex, but take your time to understand what’s going on in each line.

just

The just operator converts an Item into an Observable and emits it.

Observable.just("Hello Reactive World")
.subscribe { value -> println(value) }

Results in:

Hello Reactive World

Let’s break it down, just converts the string "Hello Reactive World" to an Observable and the subscribe method receives the value.

You may get a warning in the IDE stating that the result of the subscribeis not used. That’s because you need to add a disposable, but we’ll get to that later on.

Let’s add some complexity to it. We want to know when the item is received if there’s an error and when it completes.

Observable.just("Apple", "Orange", "Banana")
.subscribe(
{ value -> println("Received: $value") }, // onNext
{ error -> println("Error: $error") }, // onError
{ println("Completed!") } // onComplete
)

Result:

Received: Apple
Received: Orange
Received: Banana
Completed!

In this situation we have onNext , onError , and onComplete , used in a lambda expression. Their names are pretty much self-explanatory, but I’d like to generate an error just to test it out. Don’t worry about the map method for now, as we will talk about it later on.

Observable.just("Apple", "Orange", "Banana")
.map({ input -> throw RuntimeException() } )
.subscribe(
{ value -> println("Received: $value") },
{ error -> println("Error: $error") },
{ println("Completed!") }
)

Result:

I/System.out: Error: java.lang.RuntimeException

from*

There are a few ways you can use from, and some of them are listed below:

Observable.fromArray("Apple", "Orange", "Banana")
.subscribe { println(it) }

Which will result in:

Apple
Orange
Banana

Another example:

Observable.fromIterable(listOf("Apple", "Orange", "Banana"))
.subscribe(
{ value -> println("Received: $value") }, // onNext
{ error -> println("Error: $error") }, // onError
{ println("Completed") } // onComplete
)

And this will result in:

Received: Apple
Received: Orange
Received: Banana
Completed

create

This way you can create an Observable from the ground up. Let’s see an example.

First, let’s create the function that will convert the list to Observable

fun getObservableFromList(myList: List<String>) =
Observable.create<String> { emitter ->
myList.forEach { kind ->
if (kind == "") {
emitter.onError(Exception("There's no value to show"))
}
emitter.onNext(kind)
}
emitter
.onComplete()
}

The function above will create an Observableof the type string , and then it will read each item of the list and perform a check if it’s empty, and if it’s the case an error is supposed to show, otherwise, go to the next until complete.

Secondly, let’s call that function from onCreate to test it out. Make a note of the exception onError as we will test it out afterward.

getObservableFromList(listOf("Apple", "Orange", "Banana"))
.subscribe { println("Received: $it") }

And the above will result in:

Received: Apple
Received: Orange
Received: Banana

Now let’s test the onError by just removing the string"Orange" , replacing it with an empty string and adding the onError on the subscribe method.

getObservableFromList(listOf("Apple", "", "Banana"))
.subscribe(
{ v -> println("Received: $v") },
{ e -> println("Error: $e") }
)

Result:

Received: Apple
Error: java.lang.Exception: There's no value to show

Note that this time we only received the first item, and since there was an error with the second it interrupted the data stream and the error message showed up.

interval

This function will create an infinite sequence of ticks, separated by the specified duration.

Observable.intervalRange(
10L, // Start
5L, // Count
0L, // Initial Delay
1L, // Period
TimeUnit.SECONDS
).subscribe { println("Result we just received: $it") }

Result:

Result we just received: 10
Result we just received: 11
Result we just received: 12
Result we just received: 13
Result we just received: 14

In the example above, the Observable emits each second. Let’s test a simple, infinite interval.

Observable.interval(1000, TimeUnit.MILLISECONDS)
.subscribe { println("Result we just received: $it") }

Result:

Result we just received: 0
Result we just received: 1
Result we just received: 2
...

The code above will keep emitting the value each second indefinitely.

Before we talk about other types of Emitters, let’s talk a little about Backpressure.

What is Backpressure?

Backpressure is the process of handling an emitter that will produce a lot of items very fast. Let’s say an Observable produces 100000 items per second, how will a subscriber that can only handle 100 items per second process those items?

The Observable class has an unbounded buffer size, it buffers everything and pushed onto the subscriber, and if it’s emitting more than it can handle, you’re bound to get OutOfMemoryException .

We can handle such a stream of data if we apply Backpressure to the items as needed, in this way it the unnecessary items will be discarded or even let the producer know when to create or when to push the newly emitted item.

How can I solve it?

The solution is easy. Instead of an Observable you can use a Flowable which will handle the Backpressure for you since it takes it into consideration whereas Observable do not.

Think of it as a funnel when there’s too much liquid flowing in. This is the representation of an Observable :

Source: Observable vs Flowable rxjava2

And with Flowable taking Backpressure into consideration you would get:

Source: Observable vs Flowable rxjava2

Let’s code an example of backpressure and the solution.

val observable = PublishSubject.create<Int>()
observable.observeOn(Schedulers.computation())
.subscribe (
{
println("The Number Is: $it")
},
{t->
print(t.message)
}
)
for (i in 0..1000000){
observable.onNext(i)
}

The above code might result in OutOfMemoryException if the device is not top notch.

In order to handle the backpressure in this situation, we will convert it to Flowable .

val observable = PublishSubject.create<Int>()
observable
.toFlowable(BackpressureStrategy.DROP)
.observeOn(Schedulers.computation())
.subscribe (
{
println("The Number Is: $it")
},
{t->
print(t.message)
}
)
for (i in 0..1000000){
observable.onNext(i)
}

The above code is using the Backpressure strategy DROP which will drop some of the items in order to preserve memory capabilities.

Now that we’ve talked about Backpressure, which is an important part of RxJava, let’s move on to other types of Emitters.

Emitter types

We’ve been talking a lot about Observable, however, there are other types of emitters that can be used instead of an Observable . Let’s talk about some of them.

Flowable

It works exactly like an Observable but it supports Backpressure.

Flowable.just("This is a Flowable")
.subscribe(
{ value -> println("Received: $value") },
{ error -> println("Error: $error") },
{ println("Completed") }
)

Maybe

This class is used when you’d like to return a single optional value. The methods are mutually exclusive, in other words, only one of them is called. If there is an emitted value, it calls onSuccess , if there’s no value, it calls onComplete or if there’s an error, it calls onError .

Maybe.just("This is a Maybe")
.subscribe(
{ value -> println("Received: $value") },
{ error -> println("Error: $error") },
{ println("Completed") }
)

Single

It’s used when there’s a single value to be returned. If we use this class and there is a value emitted, onSuccess will be called. If there’s no value, onError will be called.

Single.just("This is a Single")
.subscribe(
{ v -> println("Value is: $v") },
{ e -> println("Error: $e")}
)

Completable

A completable won’t emit any data, what it does is let you know whether the operation was successfully completed. If it was, it calls onComplete and if it wasn’t it calls onError . A common use case of completable is for REST APIs, where successful access will return HTTP 204 , and errors can ranger from HTTP 301 , HTTP 404 , HTTP 500 , etc. We might do something with the information.

Completable.create { emitter ->
emitter.onComplete()
emitter.onError(Exception())
}

You can also manually call the methods doOnSubscribe, doOnNext, doOnError, doOnComplete.

Observable.just("Hello")
.doOnSubscribe { println("Subscribed") }
.doOnNext { s -> println("Received: $s") }
.doAfterNext { println("After Receiving") }
.doOnError { e -> println("Error: $e") }
.doOnComplete { println("Complete") }
.doFinally { println("Do Finally!") }
.doOnDispose { println("Do on Dispose!") }
.subscribe { println("Subscribe") }

Schedulers

Although RxJava is heavily marketed as an asynchronous way of doing reactive programming, it’s important to clarify that RxJava is single threaded by default, and you need to specify otherwise, and that’s where Schedulers come in.

A quick reminder of the difference between Synchronous vs Asynchronous.

Source: Asynchronous I/O for External Data Access

With Synchronous programming, only one thing happens at a time. The code fires up method a, which Reads/Write from the database, and waits for a to finish before moving on to b. So you get one thing happening at a time, and it’s the most common cause for UI freeze since the code will also run in the same thread as the UI.

With Asynchronous programming, you can call many methods at once, without waiting for another to finish. It’s one of the most fundamentals aspects of Android Development, you do not want to run every code on the same thread as the UI, especially computational code.

subscribeOn and observeOn

These methods allow you to control the action of the subscription and how you receive the changes.

subscribeOn

With subscribeOn you get to decide which thread your Emitter (such as Observable , Flowable , Single , etc) is executed.

The subscribeOn (as well as the observeOn ) needs the Scheduler param to know which thread to run on. Let’s talk about the difference between the threads.

Scheduler.io() This is the most common types of Scheduler that are used. They’re generally used for IO related stuff, such as network requests, file system operations, and it’s backed by a thread pool. A Java Thread Pool represents a group of worker threads that are waiting for the job and reuse many times.

Observable.just("Apple", "Orange", "Banana")
.subscribeOn(Schedulers.io())
.subscribe{ v -> println("Received: $v") }

Scheduler.computation() This is quite similar to IO as it’s also backed up by the thread pool, however, the number of threads that can be used is fixed to the number of cores present in the device. Say you have 2 cores, it means you’ll get 2 threads, 4 cores, 4 threads, and so on.

Observable.just("Apple", "Orange", "Banana")
.subscribeOn(Schedulers.computation())
.subscribe{ v -> println("Received: $v") }

Scheduler.newThread() The name here is self-explanatory, as it will create a new thread for each active Observable . You may want to be careful using this one as if there are a high number of Observable actions it may cause instability.

Observable.just("Apple", "Orange", "Banana")
.subscribeOn(Schedulers.newThread())
.subscribe{ v -> println("Received: $v") }

Remember, you can also set how many concurrent threads you want running, so you could do .subscribeOn(Schedulers.newThread(), 8) to have a maximum of 8 concurrent threads.

Scheduler.single() This Scheduler is backed up by a single thread. No matter how many Observable there are, it will only run in a single thread. Think about it as a replacement for the main thread.

Observable.just("Apple", "Orange", "Banana")
.subscribeOn(Schedulers.single())
.subscribe{ v -> println("Received: $v") }

Scheduler.trampoline() This will run on whatever the current thread is. If it’s the main thread, it will run the code on the queue of the main thread. Similar to Immediate Scheduler, it also blocks the thread. The trampoline may be used when we have more than one Observable and we want them to execute in order.

Observable.just("Apple", "Orange", "Banana")
.subscribeOn(Schedulers.trampoline())
.subscribe{ v -> println("Received: $v") }

Executor Scheduler This is a custom IO Scheduler, where we can set a custom pool of threads by specifying how many threads we want in that pool. It can be used in a scenario where the number of Observable can be huge for IO thread pool.

val executor = Executors.newFixedThreadPool(10)
val pooledScheduler = Schedulers.from(executor)

Observable.just("Apple", "Orange", "Banana")
.subscribeOn(pooledScheduler)
.subscribe{ v -> println("Received: $v") }

AndroidSchedulers.mainThread() Calling this on observeOn will bring the thread back to the Main UI thread, and thus make any modification you need to your UI.

observeOn

The method subscribeOn() will instruct the source Observable which thread to emit the items on and push the emissions on our Observer . But if it finds an observeOn() in the chain, it switches the emissions using the selected scheduler for the remaining operation.

Observable.just("Apple", "Orange", "Banana")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe{ v -> println("Received: $v") }

Usually, the observing thread in Android is the Main UI thread.

Transformers

With a transformer, we can avoid repeating some code by applying the most commonly used chains among your Observable , we’ll be chaining subscribeOn and observeOn to a couple of Observable below.

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)

Observable.just("Apple", "Orange", "Banana")
.compose(applyObservableAsync())
.subscribe { v -> println("The First Observable Received: $v") }

Observable.just("Water", "Fire", "Wood")
.compose(applyObservableAsync())
.subscribe { v -> println("The Second Observable Received: $v") }

}

fun <T> applyObservableAsync(): ObservableTransformer<T, T> {
return ObservableTransformer { observable ->
observable

.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
}
}

The example above will print:

The First Observable Received: Apple
The First Observable Received: Orange
The First Observable Received: Banana
The Second Observable Received: Water
The Second Observable Received: Fire
The Second Observable Received: Wood

It’s important to keep in mind that this example is for Observable , and if you’re working with other Emitters you need to change the type of the transformer, as follows.

ObservableTransformer

FlowableTransformer

SingleTransformer

MaybeTransformer

CompletableTransformer

Operators

There are many operators that you can add on the Observable chain, but let’s talk about the most common ones.

map()

Transforms values emitted by an Observable stream into a single value. Let’s take a look at a simple example below:

Observable.just("Water", "Fire", "Wood")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map { m -> m + " 2" }
.subscribe { v -> println("Received: $v") }

The above example will result in the following:

Received: Water 2
Received: Fire 2
Received: Wood 2

flatMap()

Unlike the map() operator, the flatMap() will transform each value in an Observable stream into another Observable , which are then merged into the output Observable after processing. Let’s do a visual representation of the difference between those:

map():
input: Observable<T>
transformation: (T -> R)
output: Observable<R>

flatMap():
input: Observable<T>
transformation: (T -> Observable<R>)
output: Observable<R>

Thanks to James Shvarts

As you can see, with a map you get T (generic type) for the value and R for the result, plain as that. And for the flatMap() you transform T into an Observable, which can have its own specific chains, including specifying its own thread.

Let’s use the same example from the map() above in a flatMap() and change the thread to computation.

Observable.just("Water", "Fire", "Wood")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap { m ->
Observable.just(m + " 2")
.subscribeOn(Schedulers.io())
}
.subscribe { v -> println("Received: $v") }

This will result in:

Received: Water 2
Received: Fire 2
Received: Wood 2

The above example simply transformed each value emitted by the Observable into separate Observable .

zip()

The zip() operator will combine the values of multiple Observable together through a specific function.

Observable.zip(
Observable.just(
"Roses", "Sunflowers", "Leaves", "Clouds", "Violets", "Plastics"),
Observable.just(
"Red", "Yellow", "Green", "White or Grey", "Purple"),
BiFunction<String, String, String> { type, color ->
"$type are $color"
}
)
.subscribe { v -> println("Received: $v") }

The above example will print:

Received: Roses are Red
Received: Sunflowers are Yellow
Received: Leaves are Green
Received: Clouds are White or Grey
Received: Violets are Purple

Notice that the value "Plastics" did not reach its destination. That’s because the second Observable had no corresponding value.

A real-world use case would be attaching a picture to an API result, such as avatar to name, so on and so forth.

The BiFunction <String, String, String> simply means that the value of the first and secondObservable are both a string and the resulting Observable is also a String.

concat()

As the name suggests, concat() will concatenate (join together) two or more Observable .

val test1 = Observable.just("Apple", "Orange", "Banana")
val test2 = Observable.just("Microsoft", "Google")
val test3 = Observable.just("Grass", "Tree", "Flower", "Sunflower")

Observable.concat(test1, test2, test3)
.subscribe{ x -> println("Received: " + x) }

The above example should print:

Received: Apple
Received: Orange
Received: Banana
Received: Microsoft
Received: Google
Received: Grass
Received: Tree
Received: Flower
Received: Sunflower

merge()

merge() works similarly to concat() , except merge will intercalate the emissions from both Observable , whereas concat() will wait for one to finish to show another.

Let’s visualize this difference to make it easier.

Observable.merge(
Observable.interval(250, TimeUnit.MILLISECONDS).map { i -> "Apple" },
Observable.interval(150, TimeUnit.MILLISECONDS).map { i -> "Orange" })
.take(10)
.subscribe{ v -> println("Received: $v") }

Both of the Observable above will keep emitting the item "Apple" and "Orange” , one every 250ms and the other every 150ms, then we take(10) results. The code will result in the following:

Received: Orange
Received: Apple
Received: Orange
Received: Orange
Received: Apple
Received: Orange
Received: Apple
Received: Orange
Received: Orange
Received: Apple

As you can see, the results intercalate between each other. What would happen if instead of themerge() operator we used concat() in this situation?

Observable.concat(
Observable.interval(250, TimeUnit.MILLISECONDS).map { i -> "Apple" },
Observable.interval(150, TimeUnit.MILLISECONDS).map { i -> "Orange" })
.take(10)
.subscribe{ v -> println("Received: $v") }

The code will result in:

Received: Apple
Received: Apple
Received: Apple
Received: Apple
Received: Apple
Received: Apple
Received: Apple
Received: Apple
Received: Apple
Received: Apple

The concat() operator is waiting for the first Observable to finish emitting values before it moves on to the next Observable . But since our Observable are designed to run forever it will never move on to the next one.

To sum up, merge() intercalates emissions whereas concat() doesn’t.

filter()

Filter the values according to a set condition.

Observable.just(2, 30, 22, 5, 60, 1)
.filter{ x -> x < 10 }
.subscribe{ x -> println("Received: " + x) }

The above code will only transmit the values that are less than 10, according to our condition x < 10 .

Received: 2
Received: 5
Received: 1

repeat()

This operator will repeat the emission of the values however many times we may need.

Observable.just("Apple", "Orange", "Banana")
.repeat(2)
.subscribe { v -> println("Received: $v") }

This should output the values 2:

Received: Apple
Received: Orange
Received: Banana
Received: Apple
Received: Orange
Received: Banana

take()

The take() operator is meant to grab however many emissions you’d like. A very simple example would be:

Observable.just("Apple", "Orange", "Banana")
.take(2)
.subscribe { v -> println("Received: $v") }

The above will result in:

Received: Apple
Received: Orange

So we only took the first two emissions.

A more real-world case would be checking if we have internet connectivity, and if we do, load it from the network, if we don’t, load it from a local cache.

Disposable

Now that we’ve moved on from basic operators, let’s start talking about Disposable (using Disaposable will remove the dark yellow highlight from Android Studio IDE).

A Disposable will release memory, resources, and threads used by an Observable . So, the main purpose of disposable is to free up system resources and make your app more stable.

As antekm on reddit pointed out, you should not do the following:

Observable.just("Apple", "Orange", "Banana")
.subscribe(
{ v -> println("Received: $v") }
).dispose()

This will result in:

Received: Apple
Received: Orange
Received: Banana

But in this situation, it only worked because the code is being parsed in the main thread. Let’s see how it would fail.

Observable.just("Apple", "Orange", "Banana")
.subscribeOn(Schedulers.io()) //Changing the thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ v -> println("Received: $v") }
).dispose()

You would be doing network calls with Schedulers.io() , and in this situation, you will get no results, because since it changed threads the main thread called for disposal, thus erasing the work it was performing on the io thread.

Another method of using the disposable method is to use CompositeDisposable , and you may dispose of multiple Observable , such as the example below.

val compositeDisposable = CompositeDisposable()

val observableOne = Observable.just("Tree")
.subscribe { v -> println("Received: $v") }
val observableTwo = Observable.just("Blue")
.subscribe { v -> println("Received: $v") }

compositeDisposable.add(observableOne)
compositeDisposable.add(observableTwo)
compositeDisposable.clear()

And this will result in:

Reveived: Tree
Received: Blue

Remember when dealing with dispose() you should use it when you know for certain that your thread has finished. The most elegant way of doing it is disposing of your Observable through the activity lifecycle, especially onDestroy .

I also recommend taking a look at Uber’s AutoDispose, as it may be a better way of handling the dispose of an Observable .

In order for AutoDispose to work, you must have minSdk set to 26 in your app gradle file.

PublishSubject

PublishSubject works like an Observable and an Observer at the same time.

A quick refresh on the difference between those two.

  • Observer — Any object that wants to be notified when another object changes.
  • Observable — Any object whose state may be of interest, and in whom another object may register an interest.

So, a PublishSubject will receive the values as an Observer would and also emit it as an Observable would.

A very simple use-case would be hooking it up to the UI. Let’s see an example:

class MainActivity : AppCompatActivity() {
val source = PublishSubject.create<String>()
val disposables = CompositeDisposable()
var count = 0
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
disposables.add(
source.subscribe(
{ v -> textView.text = v },
{ e -> textView.text = "Error: $e" }
)
)
button.setOnClickListener {
source
.onNext("Counter: ${++count}")
if (count == 10) {
source.onComplete()
}
}
}
override fun onDestroy() {
super.onDestroy()
disposables.clear()
}
}

The above code will increment the counter every time the button is clicked.

In Conclusion

RxJava is a highly important library when the subject is Android Development, it’s more than just thread management, using it just for that would be like using the most advanced video card ever to play minecraft.

With proper use of Rx your app will become more fluid and scalable.

--

--