Why? How? RxJava for Android?

Abhishek Pathak
12 min readDec 28, 2022

--

In this article you are going to learn why RxJava? How to use RxJava and the basic building blocks of RxJava and also how to apply RxJava on MVVM architecture using Retrofit.

Why RxJava is required to learn?

In Android, Multithreading is really a biggest requirement in these fast moving technologies because at the enterprise level applications requirement is must be scalable product without compromising the user experience so there is only way to make it by handling synchronous calls using async concepts.

Here are some reasons I can say about why RxJava is commonly used in Android development:

  1. Asynchronous programming: In Android development, asynchronous programming is crucial to avoid blocking the UI thread and keeping the app responsive. RxJava provides an easy and efficient way to handle asynchronous operations using Observable and Subscriber.
  2. Data streams: RxJava provides a way to handle data streams in a reactive way, making it easy to handle events such as user input or network responses. This allows developers to easily chain together multiple operations, such as filtering, mapping, and combining data streams.
  3. Error handling: RxJava provides an easy way to handle errors and exceptions that occur during data processing or network operations. By using operators such as onErrorResumeNext or onErrorReturn, developers can gracefully handle errors and continue processing data streams.
  4. Testing: RxJava provides a way to test reactive code using TestSubscriber and TestObserver. This allows developers to write unit tests for reactive code, making it easier to identify and fix bugs.

RxJava has made life easy for all Android Devs.

What is RxJava or Reactive Programming?

RxJava is a popular library for reactive programming in Java and Android applications. It provides a way to handle asynchronous events and data streams in a more concise and declarative way, making it easier to write reactive and responsive applications.

When there is streams of data is emitting out from the data source and to handle this kind of scenario’s we have RxJava library.

RxJava is reactive programming library for dealing with asynchronous and event based programming by using Observable sequences.

It is Push based Design System or flow of work.

It’s main building blocks are Observables, Operators, Observer that also called as triple O’s.

It help us to decide on which thread we want to run the task like API calls should on I/O kind of work and may take time to process that should be deal via some background thread.

What is RxAndroid?

Rx Android is an support of RxJava for Android development and it introduced the Main Thread require for Android UI.

Before moving to the further part, I can make you clear that we will learn a collection of topics and the mostly syllabus is below

Types of Observables in RxJava

  1. Observable

Represents a stream of data that can emit zero or more items, and optionally terminates with either a completion or an error. Observable is commonly used when dealing with asynchronous data streams or events that can emit multiple items.

Observable.just("Titanic", "Conjuring", "Cars")
.subscribe(
{ item -> println("latest item is $item") }, // This is onNext Block
{ error -> println("Error is $error") }, // This is onError Block
{ println("Task got completed") } // This is onComplete Block
)

Output:

2. Single
Represents a stream that emits a single item or an error.

  • Subscribers of a Single can handle the emitted item via the onSuccess method or handle an error via the onError method.
  • Single is useful when you expect only a single result from an operation, such as making a network request and receiving a response.
/*
* Single type of Observable is used when you have to return a single item
Item will be returned when onSuccess will be called, If there is any error occurs
then it will call onError
*/
Single.just("Example of Single Observable")
.subscribe(
{ result -> println("result is $result") },
{ error -> println("error is $error") }
)

3. Maybe
Represents a stream that can emit either a single item, no item at all, or terminate with an error.

  • Subscribers of a Maybe can handle the emitted item via the onSuccess method, handle the absence of an item via the onComplete method, or handle an error via the onError method.
  • Maybe is suitable when you need to handle situations where the result may or may not be present.
/*
* Maybe type of Observable is used when you have to return a single optional
* value as this is mutually exclusive with error, it means it will call either success
* or error and then onComplete
*/
Maybe.just("Example of Maybe Observable")
.subscribe(
{ result -> println("result is $result") },
{ error -> println("error is $error") },
{ println("Completed the task") }
)

4. Completable
Represents an asynchronous operation that either completes successfully without emitting any item or terminates with an error.

  • Subscribers of a Completable only need to handle the completion event or an error, as it does not emit any items.
  • Completable is useful when you are only interested in the completion status of an operation, rather than receiving data.
/*
* Maybe type of Observable is used when you have to return a single optional
* value as this is mutually exclusive with error, it means it will call either success
* or error and then onComplete
*/
Completable.create { completableEmitter ->
completableEmitter.onComplete()
completableEmitter.onError(Exception("something wrong!!"))
}

5. Flowable

Similar to Observable, but designed to handle backpressure, which is a mechanism to handle situations where an Observable is emitting data faster than the subscriber can consume.

  • Flowable supports asynchronous data streams that can emit zero or more items, and optionally terminate with either a completion or an error.
  • It provides additional operators for dealing with backpressure and controlling the flow of data between the producer and the consumer.
/*
* It is very similar to Observable but it also handles the BackPressure mechanism.
*/
Flowable.just("Example of Flowable Observable")
.subscribe(
{ result -> println("result is $result") },
{ error -> println("error is $error") },
{ println("Completed the task") }
)

Types of Operators

  1. Just : It is the simplest operator, and it returns the data as it is.
Observable.just("Titanic", "Conjuring", "Cars")
.subscribe(
{ item -> println("latest item is $item") }, // This is onNext Block
{ error -> println("Error is $error") }, // This is onError Block
{ println("Task got completed") } // This is onComplete Block
)

2. map()

Observable.just("Titanic", "Conjuring", "Cars")
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.map { it -> "$it is old movie" }
.subscribe(
{ item -> Log.i("tag","$item") }, // This is onNext Block
{ error -> Log.i("tag","Error is $error") }, // This is onError Block
{ Log.i("tag","Task got completed") } // This is onComplete Block
)

3. flatMap()

Observable.just(1, 2, 3, 4)
.flatMap { item -> Observable.just(item + 2) }
.subscribeOn(Schedulers.io())
.subscribe { values -> Log.i("tag","$values") }

map() vs flatMap()

  • Map transforms the items emitted by an Observable by applying a function to each item.
  • FlatMap transforms the items emitted by an Observable into Observables.

4. filter()

Observable.just(100, 300, 800, 1100, 33, 99, 222)
.filter { item -> item < 100 }
.subscribe { result -> println("Selected items are $result") }
filter example result

5. zip()

/*
Zip Operator can be used to combine the values from multiple data source Observable
and give a specific function based result
*/
Observable.zip(
Observable.just("Android"),
Observable.just("Lollipop", "Marshmallow", "Nougat", "Oreo", "Pie"),
BiFunction<String, String, String> { OS, Version -> "$OS are $Version" }
).subscribe { result -> Log.i("tag", "$result") }

If you observe here the result contains only single item as the Android is combining with first item of second data source then after that there is no more items are there so it will be the only item in the result.

6. concat()

/* 
Concat Operator is just making concatenation by together the
one or many data sources from Observable
*/
val source1 = Observable.just("London", "NewYork", "Delhi", "Tokyo")
val source2 = Observable.just("England", "USA", "India", "Japan")
val source3 =
Observable.just(
"Bristish English",
"American English",
"English/Hindi",
"Japanese")
Observable.concat(source1, source2, source3)
.subscribe { result -> Log.i("tag", "$result") }

7. merge() with take()

/*
Using Merge and Take two operators use-case as merge work same as concat, except merge
will mix the emissions from both Sources, while concat will wait for one to finish
to stream another source.
*/
Observable.merge(
Observable.interval(250, TimeUnit.MILLISECONDS).map { i -> "JAVA" },
Observable.interval(250, TimeUnit.MILLISECONDS).map { i -> "Kotlin" })
.take(10)
.subscribe { result -> Log.i("tag","$result") }

8. take()

/*
Take is an operator that will pick only item passed as count and leave the other from the
data source
*/
Observable.just("Swimming,", "Playing", "Boating", "Fishing")
.take(2)
.subscribe { result -> Log.i("tag", "Your main 2 hobbies are $result") }

9. repeat()

/*
Repeat is an operator that will replicate the data source how many times you want
*/
Log.i("tag","Which team wins the repeated world cup in Football")
Observable.just("France", "Argentina", "England", "USA")
.repeat(2)
.subscribe { result -> Log.i("tag", "$result") }

10. buffer()

        /*
Buffer operator generally deals with splitting the result into chunks
*/
Observable.range(1, 50)
.buffer(8)
.subscribe { println("Result: $it") }

11. buffer() with take()

Observable.range(1, 50)
.buffer(5) // range from initial to max [1,2,3,4,5] = creating a set of total buffer size
.take(4) // out of total result, get up to the count into the take input
.subscribe { println("Result: $it") }

these above one’s that i discussed are mainly used but there are many other operators which can used as per requirement like concapMap(), SwitchMap(), mergeMap(), debounce()

Observer

  • onNext() -> the next upcoming data will be received here.
  • onError() -> if got error in observing data
  • onComplete() -> if all data received from Observable
  • onSubscribe() -> Once observer got subscribed to the observable the stream of data flow starts.
fun getObserver(): Observer<String> {
return object : Observer<String> {
override fun onNext(t: String) {
println("onNext $t") // calls unit data not completely received
}
override fun onError(e: Throwable) {
println(e.printStackTrace()) // calls when some error occurred
}
override fun onComplete() {
println("completed")// once all data received
}
override fun onSubscribe(d: Disposable) {
println("Subscribed") // calls when subscription starts
}
}
}
This is a way to call all the methods manually i just showcase here 
how to make them call?
Observable.just("Hey I am doing RxJava")
.doOnSubscribe { Log.i("tag", "Want to call on subscribed") }
.doOnNext { Log.i("tag", "Want to call on each next item") }
.doAfterNext { Log.i("tag", "Want to call after onNext") }
.doOnComplete { Log.i("tag", "Want to call on task complete") }
.doOnDispose { Log.i("tag", "Want to call on when disposal is going to collect it") }
.subscribe { Log.i("tag", "Your result") }

RxJava Use-Cases in Android during development

Case 1: When you want to Call an API and save the response data to local storage/file. It would be long running task and doing this task on main thread, it will lead to unexpected behavior like App not responding. So to overcome this situation we should use RxJava for better result.

Case 2 : When you want combine two API result then zip() operator to combine the result of multiple different API calls and return a single response.

Case 3: A general approach is do an API call and the from the Collection, we can filter the content of the specific user based on the conditions and then return the data by using directly filter() operator keeping thread management also in the whole process.

Cold vs Hot Observables

In RxJava, observables are sequences of data or events that can be observed by subscribers. Observables can be categorized into two main types: cold observables and hot observables. These categories describe how the data or events are produced and distributed to subscribers. Let’s explore the differences between cold and hot observables:

Cold Observables

  • Single Producer, Multiple Consumers: Cold observables are like traditional collections or sequences. Each subscription to a cold observable creates a new producer of data or events. In other words, each subscriber gets its independent sequence of data.
  • Lazy Execution: The data or events in a cold observable are produced only when a subscriber subscribes to it. The producer starts emitting items from the beginning of the sequence for each subscriber.

Cold Observable (e.g., Netflix): Imagine streaming Mission: Impossible on Netflix. When you press play, a dedicated data producer is crafted exclusively for you. Every subscriber initiates a new producer, ensuring everyone receives the entire movie, regardless of when they start.

Hot Observables

  • Multiple Producers, Single or Multiple Consumers: Hot observables, on the other hand, have one or more producers that emit data or events independently of subscribers. Subscribers that subscribe to a hot observable at different times might receive different parts of the sequence or overlapping data.
  • Eager Execution: Data or events in hot observables are produced regardless of whether there are subscribers. Subscribers receive events that are emitted after they subscribe.

Hot Observable (e.g., Movie Theater): Contrastingly, a hot observable is akin to a movie theater experience. Picture the show starting at 4 p.m. The producer is created at that moment, and latecomers miss the beginning, only joining the stream from their time of arrival. Subscribers share a single producer.

Examples:-

private fun hotObservable() {
val myObservable = Observable.interval(1, TimeUnit.SECONDS)
val hotObservable = myObservable.publish().refCount()

val subscription1 = hotObservable
.doOnSubscribe { Log.i("tag", "Observer 1 subscribed") }
.doFinally { Log.i("tag", "Observer 1 unsubscribed") }
.subscribe { Log.i("tag", "Observer 1 is $it") }
Thread.sleep(3000)

val subscription2 = hotObservable
.doOnSubscribe { Log.i("tag", "Observer 2 subscribed") }
.doFinally { Log.i("tag", "Observer 2 unsubscribed") }
.subscribe { Log.i("tag", "Observer 2 is $it") }
Thread.sleep(3000)

subscription1.dispose()
val subscription3 = hotObservable
.doOnSubscribe { Log.i("tag", "Observer 3 subscribed") }
.doFinally { Log.i("tag", "Observer 3 unsubscribed") }
.subscribe { Log.i("tag", "Observer 3 is $it") }
Thread.sleep(3000)
subscription2.dispose()
subscription3.dispose()
}

@SuppressLint("CheckResult")
private fun coldObservable() {
val source: Observable<String> = Observable.just(
"Spiderman",
"John wick",
"Ant man",
"Titanic",
"FF7",
"James bond series 007",
"Jumangi"
)

source.map { it }
.subscribe { println("Luan is $it") }
Thread.sleep(3000)
source.map { it }
.subscribe { println("Alex is $it") }
Thread.sleep(3000)
source.map { it }
.subscribe { println("Thomas is $it") }
Thread.sleep(3000)
source.map { it }
.subscribe { println("Josh is $it") }
Thread.sleep(3000)
}

Let’s walk through of RxJava MVVM Implementation

Step 1: Add required dependencies


// RxJava 2
implementation("io.reactivex.rxjava2:rxjava:2.2.7")
implementation("io.reactivex.rxjava2:rxandroid:2.1.1")

//RxJava2 with Retrofit
implementation("com.squareup.retrofit2:adapter-rxjava2:2.9.0")

//Retrofit
implementation("com.squareup.retrofit2:retrofit:2.9.0")
implementation("com.squareup.retrofit2:converter-gson:2.9.0")
implementation("com.squareup.picasso:picasso:2.71828")

Step 2: Implement Service Layer using Observables as data source

interface ApiService {
@GET(END_POINT)
fun getRandomDog(): Single<DogImageResponse>
}

Step 3: Implement Retrofit Builder

object RetrofitBuilder {
private lateinit var retrofit: Retrofit
fun getRetrofit(): Retrofit {
if (!this::retrofit.isInitialized) {
retrofit = Retrofit.Builder()
.baseUrl(BASE_URL)
.addConverterFactory(GsonConverterFactory.create())
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.build()
}
return retrofit
}
}

Step 4: Implement the ViewModel

class DogViewModel : ViewModel() {
val dogResponse = MutableLiveData<DogImageResponse>()
val error = MutableLiveData<String>()
private lateinit var retrofit: Retrofit
private lateinit var apiService: ApiService
private lateinit var disposable: Disposable
fun getRandomDogs() {
retrofit = RetrofitBuilder.getRetrofit()
apiService = retrofit.create(ApiService::class.java)
disposable = apiService.getRandomDog()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{
dogResponse.postValue(it)
}, {
error.postValue(it.toString())
}
)
}
override fun onCleared() {
super.onCleared()
if (this::disposable.isInitialized) {
disposable.dispose()
}
}
}

Step 5: Implement the UI, here I have an activity

class MainActivity : AppCompatActivity() {
private lateinit var binding: ActivityMainBinding
private lateinit var viewModel: DogViewModel
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
binding = ActivityMainBinding.inflate(layoutInflater)
setContentView(binding.root)
setUpViewModel()
initViews()
setUpObserver()
}
private fun setUpObserver() {
with(viewModel) {
dogResponse.observe(this@MainActivity) {
Picasso.get()
.load(it.message)
.placeholder(R.drawable.ic_launcher_background)
.error(android.R.drawable.ic_dialog_alert)
.into(binding.imageOfDog)
}
viewModel.error.observe(this@MainActivity) {
Toast.makeText(this@MainActivity, it, Toast.LENGTH_SHORT).show()
}
}
}
private fun setUpViewModel() {
viewModel = ViewModelProvider(this)[DogViewModel::class.java]
}
private fun initViews() {
binding.btnNewDog.setOnClickListener {
viewModel.getRandomDogs()
}
}
}

Here is my another example available on github for searching using filter in MVVM based app by using Kotlin.

Conclusion

RxJava is vast library and it is very useful library for Android development
for dealing with thread management and to provide scalability in architecture. Overall, It is a powerful library that can simplify the development of reactive and responsive Android applications. Its use in Android development has become increasingly popular due to its numerous benefits and ease of use.

I love to code with RxJava, please try it :)

Thank you for reading my article. I really appreciate 👏 your response.

Clap if this article helps you. It means a lot to me. If I got something wrong, please comment for improve.
let’s connect on
Linkedin , GitHub

--

--