Fundamentals of RxJava with Kotlin for absolute beginners
Reactive programming provides a solid foundation towards a scalable application, and today I will give you an introduction on how to use RxJava with Kotlin.
What’s RxJava?
RxJava is a reactive programming library for composing asynchronous and event-based programs by using observable sequences.
Reactive programming is based on data streams and the propagation of change. With reactive programming, you can express static (such as arrays), or dynamic (such as event emitters) data streams with ease.
Explain like I’m Five, What’s RxJava?
Let’s say Donna is a McDonald’s cashier and the manager thinks she is either stealing some cash or she gave out the wrong change. So, he asked Josh to keep an eye on Donna and report back to him on everything she does.
Josh watched closely that day as Donna gave out the wrong change twice and dropped some change by mistake since she has a hand twitching condition.
Josh, being the good employee that he is, immediately reported to his manager as the events took place.
In this situation, Josh is the Observer and Donna is the data. Josh was told to watch and report Donna as her state changes, and he’s to make a callback to whoever is listening to him (the manager).
Let’s transform this little story on some code.
Now, where can you use RxJava?
There is a multitude of places you can use RxJava, and below are the most common places where you can implement it:
- Network Calls (such as API calls through HTTP with Retrofit, which fully supports RxJava);
- UI events that should trigger actions;
- Database Read and Write and/or files in the system;
- Data coming out of the sensors;
- Etc
Now let’s take a look at some of the main elements of RxJava.
Observable, Operator, Observer
An Observable is where the data stream comes from, it does some work and emits values.
An Operator has the capability to modify the data from one form to another.
An Observer receives the values.
Think of it this way, Observable is the Speaker, Operator is the Translator, and the Observer is the Listener.
Let’s fire up some examples, but first things first.
Adding dependency
Open the app level Build.gradle and add the following:
dependencies {
...
implementation "io.reactivex.rxjava2:rxjava:2.2.7"
implementation "io.reactivex.rxjava2:rxandroid:2.1.1"
}
Let’s create an Observable
There are many ways to do so, and we’ll list some of them below. The examples may get somewhat complex, but take your time to understand what’s going on in each line.
just
The just operator converts an Item into an Observable and emits it.
Observable.just("Hello Reactive World")
.subscribe { value -> println(value) }
Results in:
Hello Reactive World
Let’s break it down, just
converts the string "Hello Reactive World"
to an Observable
and the subscribe method receives the value.
You may get a warning in the IDE stating that the result of the subscribe
is not used. That’s because you need to add a disposable, but we’ll get to that later on.
Let’s add some complexity to it. We want to know when the item is received if there’s an error and when it completes.
Observable.just("Apple", "Orange", "Banana")
.subscribe(
{ value -> println("Received: $value") }, // onNext
{ error -> println("Error: $error") }, // onError
{ println("Completed!") } // onComplete
)
Result:
Received: Apple
Received: Orange
Received: Banana
Completed!
In this situation we have onNext
, onError
, and onComplete
, used in a lambda expression. Their names are pretty much self-explanatory, but I’d like to generate an error just to test it out. Don’t worry about the map
method for now, as we will talk about it later on.
Observable.just("Apple", "Orange", "Banana")
.map({ input -> throw RuntimeException() } )
.subscribe(
{ value -> println("Received: $value") },
{ error -> println("Error: $error") },
{ println("Completed!") }
)
Result:
I/System.out: Error: java.lang.RuntimeException
from*
There are a few ways you can use from, and some of them are listed below:
Observable.fromArray("Apple", "Orange", "Banana")
.subscribe { println(it) }
Which will result in:
Apple
Orange
Banana
Another example:
Observable.fromIterable(listOf("Apple", "Orange", "Banana"))
.subscribe(
{ value -> println("Received: $value") }, // onNext
{ error -> println("Error: $error") }, // onError
{ println("Completed") } // onComplete
)
And this will result in:
Received: Apple
Received: Orange
Received: Banana
Completed
create
This way you can create an Observable
from the ground up. Let’s see an example.
First, let’s create the function that will convert the list to Observable
fun getObservableFromList(myList: List<String>) =
Observable.create<String> { emitter ->
myList.forEach { kind ->
if (kind == "") {
emitter.onError(Exception("There's no value to show"))
}
emitter.onNext(kind)
}
emitter.onComplete()
}
The function above will create an Observable
of the type string
, and then it will read each item of the list and perform a check if it’s empty, and if it’s the case an error is supposed to show, otherwise, go to the next until complete.
Secondly, let’s call that function from onCreate
to test it out. Make a note of the exception onError
as we will test it out afterward.
getObservableFromList(listOf("Apple", "Orange", "Banana"))
.subscribe { println("Received: $it") }
And the above will result in:
Received: Apple
Received: Orange
Received: Banana
Now let’s test the onError
by just removing the string"Orange"
, replacing it with an empty string and adding the onError
on the subscribe
method.
getObservableFromList(listOf("Apple", "", "Banana"))
.subscribe(
{ v -> println("Received: $v") },
{ e -> println("Error: $e") }
)
Result:
Received: Apple
Error: java.lang.Exception: There's no value to show
Note that this time we only received the first item, and since there was an error with the second it interrupted the data stream and the error message showed up.
interval
This function will create an infinite sequence of ticks, separated by the specified duration.
Observable.intervalRange(
10L, // Start
5L, // Count
0L, // Initial Delay
1L, // Period
TimeUnit.SECONDS
).subscribe { println("Result we just received: $it") }
Result:
Result we just received: 10
Result we just received: 11
Result we just received: 12
Result we just received: 13
Result we just received: 14
In the example above, the Observable
emits each second. Let’s test a simple, infinite interval.
Observable.interval(1000, TimeUnit.MILLISECONDS)
.subscribe { println("Result we just received: $it") }
Result:
Result we just received: 0
Result we just received: 1
Result we just received: 2
...
The code above will keep emitting the value each second indefinitely.
Before we talk about other types of Emitters, let’s talk a little about Backpressure.
What is Backpressure?
Backpressure is the process of handling an emitter that will produce a lot of items very fast. Let’s say an Observable
produces 100000 items per second, how will a subscriber that can only handle 100 items per second process those items?
The Observable
class has an unbounded buffer size, it buffers everything and pushed onto the subscriber, and if it’s emitting more than it can handle, you’re bound to get OutOfMemoryException
.
We can handle such a stream of data if we apply Backpressure to the items as needed, in this way it the unnecessary items will be discarded or even let the producer know when to create or when to push the newly emitted item.
How can I solve it?
The solution is easy. Instead of an Observable
you can use a Flowable
which will handle the Backpressure for you since it takes it into consideration whereas Observable
do not.
Think of it as a funnel when there’s too much liquid flowing in. This is the representation of an Observable
:
And with Flowable
taking Backpressure into consideration you would get:
Let’s code an example of backpressure and the solution.
val observable = PublishSubject.create<Int>()
observable.observeOn(Schedulers.computation())
.subscribe (
{
println("The Number Is: $it")
},
{t->
print(t.message)
}
)
for (i in 0..1000000){
observable.onNext(i)
}
The above code might result in OutOfMemoryException
if the device is not top notch.
In order to handle the backpressure in this situation, we will convert it to Flowable
.
val observable = PublishSubject.create<Int>()
observable
.toFlowable(BackpressureStrategy.DROP)
.observeOn(Schedulers.computation())
.subscribe (
{
println("The Number Is: $it")
},
{t->
print(t.message)
}
)
for (i in 0..1000000){
observable.onNext(i)
}
The above code is using the Backpressure strategy DROP
which will drop some of the items in order to preserve memory capabilities.
Now that we’ve talked about Backpressure, which is an important part of RxJava, let’s move on to other types of Emitters.
Emitter types
We’ve been talking a lot about Observable
, however, there are other types of emitters that can be used instead of an Observable
. Let’s talk about some of them.
Flowable
It works exactly like an Observable
but it supports Backpressure.
Flowable.just("This is a Flowable")
.subscribe(
{ value -> println("Received: $value") },
{ error -> println("Error: $error") },
{ println("Completed") }
)
Maybe
This class is used when you’d like to return a single optional value. The methods are mutually exclusive, in other words, only one of them is called. If there is an emitted value, it calls onSuccess
, if there’s no value, it calls onComplete
or if there’s an error, it calls onError
.
Maybe.just("This is a Maybe")
.subscribe(
{ value -> println("Received: $value") },
{ error -> println("Error: $error") },
{ println("Completed") }
)
Single
It’s used when there’s a single value to be returned. If we use this class and there is a value emitted, onSuccess
will be called. If there’s no value, onError
will be called.
Single.just("This is a Single")
.subscribe(
{ v -> println("Value is: $v") },
{ e -> println("Error: $e")}
)
Completable
A completable won’t emit any data, what it does is let you know whether the operation was successfully completed. If it was, it calls onComplete
and if it wasn’t it calls onError
. A common use case of completable is for REST APIs, where successful access will return HTTP 204
, and errors can ranger from HTTP 301
, HTTP 404
, HTTP 500
, etc. We might do something with the information.
Completable.create { emitter ->
emitter.onComplete()
emitter.onError(Exception())
}
You can also manually call the methods doOnSubscribe, doOnNext, doOnError, doOnComplete.
Observable.just("Hello")
.doOnSubscribe { println("Subscribed") }
.doOnNext { s -> println("Received: $s") }
.doAfterNext { println("After Receiving") }
.doOnError { e -> println("Error: $e") }
.doOnComplete { println("Complete") }
.doFinally { println("Do Finally!") }
.doOnDispose { println("Do on Dispose!") }
.subscribe { println("Subscribe") }
Schedulers
Although RxJava is heavily marketed as an asynchronous way of doing reactive programming, it’s important to clarify that RxJava is single threaded by default, and you need to specify otherwise, and that’s where Schedulers come in.
A quick reminder of the difference between Synchronous vs Asynchronous.
With Synchronous programming, only one thing happens at a time. The code fires up method a, which Reads/Write from the database, and waits for a to finish before moving on to b. So you get one thing happening at a time, and it’s the most common cause for UI freeze since the code will also run in the same thread as the UI.
With Asynchronous programming, you can call many methods at once, without waiting for another to finish. It’s one of the most fundamentals aspects of Android Development, you do not want to run every code on the same thread as the UI, especially computational code.
subscribeOn and observeOn
These methods allow you to control the action of the subscription and how you receive the changes.
subscribeOn
With subscribeOn
you get to decide which thread your Emitter (such as Observable
, Flowable
, Single
, etc) is executed.
The subscribeOn
(as well as the observeOn
) needs the Scheduler
param to know which thread to run on. Let’s talk about the difference between the threads.
Scheduler.io()
This is the most common types of Scheduler that are used. They’re generally used for IO related stuff, such as network requests, file system operations, and it’s backed by a thread pool. A Java Thread Pool represents a group of worker threads that are waiting for the job and reuse many times.
Observable.just("Apple", "Orange", "Banana")
.subscribeOn(Schedulers.io())
.subscribe{ v -> println("Received: $v") }
Scheduler.computation()
This is quite similar to IO as it’s also backed up by the thread pool, however, the number of threads that can be used is fixed to the number of cores present in the device. Say you have 2 cores, it means you’ll get 2 threads, 4 cores, 4 threads, and so on.
Observable.just("Apple", "Orange", "Banana")
.subscribeOn(Schedulers.computation())
.subscribe{ v -> println("Received: $v") }
Scheduler.newThread()
The name here is self-explanatory, as it will create a new thread for each active Observable
. You may want to be careful using this one as if there are a high number of Observable
actions it may cause instability.
Observable.just("Apple", "Orange", "Banana")
.subscribeOn(Schedulers.newThread())
.subscribe{ v -> println("Received: $v") }
Remember, you can also set how many concurrent threads you want running, so you could do .subscribeOn(Schedulers.newThread(), 8)
to have a maximum of 8 concurrent threads.
Scheduler.single()
This Scheduler
is backed up by a single thread. No matter how many Observable
there are, it will only run in a single thread. Think about it as a replacement for the main thread.
Observable.just("Apple", "Orange", "Banana")
.subscribeOn(Schedulers.single())
.subscribe{ v -> println("Received: $v") }
Scheduler.trampoline()
This will run on whatever the current thread is. If it’s the main thread, it will run the code on the queue of the main thread. Similar to Immediate Scheduler, it also blocks the thread. The trampoline
may be used when we have more than one Observable
and we want them to execute in order.
Observable.just("Apple", "Orange", "Banana")
.subscribeOn(Schedulers.trampoline())
.subscribe{ v -> println("Received: $v") }
Executor Scheduler
This is a custom IO Scheduler, where we can set a custom pool of threads by specifying how many threads we want in that pool. It can be used in a scenario where the number of Observable
can be huge for IO thread pool.
val executor = Executors.newFixedThreadPool(10)
val pooledScheduler = Schedulers.from(executor)
Observable.just("Apple", "Orange", "Banana")
.subscribeOn(pooledScheduler)
.subscribe{ v -> println("Received: $v") }
AndroidSchedulers.mainThread()
Calling this on observeOn
will bring the thread back to the Main UI thread, and thus make any modification you need to your UI.
observeOn
The method subscribeOn()
will instruct the source Observable which thread to emit the items on and push the emissions on our Observer
. But if it finds an observeOn()
in the chain, it switches the emissions using the selected scheduler
for the remaining operation.
Observable.just("Apple", "Orange", "Banana")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe{ v -> println("Received: $v") }
Usually, the observing thread in Android is the Main UI thread.
Transformers
With a transformer, we can avoid repeating some code by applying the most commonly used chains among your Observable
, we’ll be chaining subscribeOn
and observeOn
to a couple of Observable
below.
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
Observable.just("Apple", "Orange", "Banana")
.compose(applyObservableAsync())
.subscribe { v -> println("The First Observable Received: $v") }
Observable.just("Water", "Fire", "Wood")
.compose(applyObservableAsync())
.subscribe { v -> println("The Second Observable Received: $v") }
}
fun <T> applyObservableAsync(): ObservableTransformer<T, T> {
return ObservableTransformer { observable ->
observable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
}
}
The example above will print:
The First Observable Received: Apple
The First Observable Received: Orange
The First Observable Received: Banana
The Second Observable Received: Water
The Second Observable Received: Fire
The Second Observable Received: Wood
It’s important to keep in mind that this example is for Observable
, and if you’re working with other Emitters you need to change the type of the transformer, as follows.
ObservableTransformer
FlowableTransformer
SingleTransformer
MaybeTransformer
CompletableTransformer
Operators
There are many operators that you can add on the Observable
chain, but let’s talk about the most common ones.
map()
Transforms values emitted by an Observable
stream into a single value. Let’s take a look at a simple example below:
Observable.just("Water", "Fire", "Wood")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map { m -> m + " 2" }
.subscribe { v -> println("Received: $v") }
The above example will result in the following:
Received: Water 2
Received: Fire 2
Received: Wood 2
flatMap()
Unlike the map()
operator, the flatMap()
will transform each value in an Observable
stream into another Observable
, which are then merged into the output Observable
after processing. Let’s do a visual representation of the difference between those:
map():
input: Observable<T>
transformation: (T -> R)
output: Observable<R>
flatMap():
input: Observable<T>
transformation: (T -> Observable<R>)
output: Observable<R>
Thanks to James Shvarts
As you can see, with a map you get T (generic type) for the value and R for the result, plain as that. And for the flatMap()
you transform T into an Observable
, which can have its own specific chains, including specifying its own thread.
Let’s use the same example from the map()
above in a flatMap()
and change the thread to computation.
Observable.just("Water", "Fire", "Wood")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap { m ->
Observable.just(m + " 2")
.subscribeOn(Schedulers.io())
}
.subscribe { v -> println("Received: $v") }
This will result in:
Received: Water 2
Received: Fire 2
Received: Wood 2
The above example simply transformed each value emitted by the Observable
into separate Observable
.
zip()
The zip()
operator will combine the values of multiple Observable
together through a specific function.
Observable.zip(
Observable.just(
"Roses", "Sunflowers", "Leaves", "Clouds", "Violets", "Plastics"),
Observable.just(
"Red", "Yellow", "Green", "White or Grey", "Purple"),
BiFunction<String, String, String> { type, color ->
"$type are $color"
}
)
.subscribe { v -> println("Received: $v") }
The above example will print:
Received: Roses are Red
Received: Sunflowers are Yellow
Received: Leaves are Green
Received: Clouds are White or Grey
Received: Violets are Purple
Notice that the value "Plastics"
did not reach its destination. That’s because the second Observable
had no corresponding value.
A real-world use case would be attaching a picture to an API result, such as avatar to name, so on and so forth.
The BiFunction
<String, String, String>
simply means that the value of the first and secondObservable
are both a string and the resulting Observable
is also a String
.
concat()
As the name suggests, concat()
will concatenate (join together) two or more Observable
.
val test1 = Observable.just("Apple", "Orange", "Banana")
val test2 = Observable.just("Microsoft", "Google")
val test3 = Observable.just("Grass", "Tree", "Flower", "Sunflower")
Observable.concat(test1, test2, test3)
.subscribe{ x -> println("Received: " + x) }
The above example should print:
Received: Apple
Received: Orange
Received: Banana
Received: Microsoft
Received: Google
Received: Grass
Received: Tree
Received: Flower
Received: Sunflower
merge()
merge()
works similarly to concat()
, except merge will intercalate the emissions from both Observable
, whereas concat()
will wait for one to finish to show another.
Let’s visualize this difference to make it easier.
Observable.merge(
Observable.interval(250, TimeUnit.MILLISECONDS).map { i -> "Apple" },
Observable.interval(150, TimeUnit.MILLISECONDS).map { i -> "Orange" })
.take(10)
.subscribe{ v -> println("Received: $v") }
Both of the Observable
above will keep emitting the item "Apple"
and "Orange”
, one every 250ms and the other every 150ms, then we take(10)
results. The code will result in the following:
Received: Orange
Received: Apple
Received: Orange
Received: Orange
Received: Apple
Received: Orange
Received: Apple
Received: Orange
Received: Orange
Received: Apple
As you can see, the results intercalate between each other. What would happen if instead of themerge()
operator we used concat()
in this situation?
Observable.concat(
Observable.interval(250, TimeUnit.MILLISECONDS).map { i -> "Apple" },
Observable.interval(150, TimeUnit.MILLISECONDS).map { i -> "Orange" })
.take(10)
.subscribe{ v -> println("Received: $v") }
The code will result in:
Received: Apple
Received: Apple
Received: Apple
Received: Apple
Received: Apple
Received: Apple
Received: Apple
Received: Apple
Received: Apple
Received: Apple
The concat()
operator is waiting for the first Observable
to finish emitting values before it moves on to the next Observable
. But since our Observable
are designed to run forever it will never move on to the next one.
To sum up, merge()
intercalates emissions whereas concat()
doesn’t.
filter()
Filter the values according to a set condition.
Observable.just(2, 30, 22, 5, 60, 1)
.filter{ x -> x < 10 }
.subscribe{ x -> println("Received: " + x) }
The above code will only transmit the values that are less than 10, according to our condition x < 10
.
Received: 2
Received: 5
Received: 1
repeat()
This operator will repeat the emission of the values however many times we may need.
Observable.just("Apple", "Orange", "Banana")
.repeat(2)
.subscribe { v -> println("Received: $v") }
This should output the values 2:
Received: Apple
Received: Orange
Received: Banana
Received: Apple
Received: Orange
Received: Banana
take()
The take()
operator is meant to grab however many emissions you’d like. A very simple example would be:
Observable.just("Apple", "Orange", "Banana")
.take(2)
.subscribe { v -> println("Received: $v") }
The above will result in:
Received: Apple
Received: Orange
So we only took the first two emissions.
A more real-world case would be checking if we have internet connectivity, and if we do, load it from the network, if we don’t, load it from a local cache.
Disposable
Now that we’ve moved on from basic operators, let’s start talking about Disposable
(using Disaposable
will remove the dark yellow highlight from Android Studio IDE).
A Disposable
will release memory, resources, and threads used by an Observable
. So, the main purpose of disposable is to free up system resources and make your app more stable.
As antekm on reddit pointed out, you should not do the following:
Observable.just("Apple", "Orange", "Banana")
.subscribe(
{ v -> println("Received: $v") }
).dispose()
This will result in:
Received: Apple
Received: Orange
Received: Banana
But in this situation, it only worked because the code is being parsed in the main thread. Let’s see how it would fail.
Observable.just("Apple", "Orange", "Banana")
.subscribeOn(Schedulers.io()) //Changing the thread
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ v -> println("Received: $v") }
).dispose()
You would be doing network calls with Schedulers.io()
, and in this situation, you will get no results, because since it changed threads the main thread called for disposal, thus erasing the work it was performing on the io thread.
Another method of using the disposable method is to use CompositeDisposable
, and you may dispose of multiple Observable
, such as the example below.
val compositeDisposable = CompositeDisposable()
val observableOne = Observable.just("Tree")
.subscribe { v -> println("Received: $v") }
val observableTwo = Observable.just("Blue")
.subscribe { v -> println("Received: $v") }
compositeDisposable.add(observableOne)
compositeDisposable.add(observableTwo)
compositeDisposable.clear()
And this will result in:
Reveived: Tree
Received: Blue
Remember when dealing with dispose()
you should use it when you know for certain that your thread has finished. The most elegant way of doing it is disposing of your Observable
through the activity lifecycle, especially onDestroy
.
I also recommend taking a look at Uber’s AutoDispose, as it may be a better way of handling the dispose of an Observable
.
In order for AutoDispose to work, you must have minSdk
set to 26
in your app gradle
file.
PublishSubject
PublishSubject
works like an Observable
and an Observer
at the same time.
A quick refresh on the difference between those two.
Observer
— Any object that wants to be notified when another object changes.Observable
— Any object whose state may be of interest, and in whom another object may register an interest.
So, a PublishSubject
will receive the values as an Observer
would and also emit it as an Observable
would.
A very simple use-case would be hooking it up to the UI. Let’s see an example:
class MainActivity : AppCompatActivity() {
val source = PublishSubject.create<String>()
val disposables = CompositeDisposable()
var count = 0 override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
disposables.add(
source.subscribe(
{ v -> textView.text = v },
{ e -> textView.text = "Error: $e" }
)
)
button.setOnClickListener {
source.onNext("Counter: ${++count}")
if (count == 10) {
source.onComplete()
}
}
}
override fun onDestroy() {
super.onDestroy()
disposables.clear()
}
}
The above code will increment the counter every time the button is clicked.
In Conclusion
RxJava is a highly important library when the subject is Android Development, it’s more than just thread management, using it just for that would be like using the most advanced video card ever to play minecraft.
With proper use of Rx your app will become more fluid and scalable.