Kotlin’s Reactive Tools: A Deep Dive into RxKotlin and Flow — Part I

Summit Kumar
6 min readAug 20, 2023

--

In the world of asynchronous programming in Kotlin, two powerful contenders have emerged: RxKotlin and Flow. Both libraries provide mechanisms for handling data streams, reactive programming, and asynchronous operations. In this blog post, we'll dive into the differences between these libraries, explore their unique features, and guide you in making the right choice for your projects.

Introduction to RxKotlin and Flow

RxKotlin — RxKotlin is a member of the ReactiveX (Rx) family, which is a library that adheres to Reactive Programming principles. It offers various tools for producing and converting asynchronous data streams, called Observables. These Observables can produce data items, react to changes, and be handled with a variety of operators such as filter, map, combine, etc. The chainable characteristics of RxKotlin operators allow for intricate data processing channels, which makes it a preferred option for handling asynchronous events.

Flow — Kotlin's Coroutines library includes Flow, which is an essential feature. It works well with Kotlin's structured concurrency approach and can easily manage asynchronous operations when combined with coroutines. Flow is a type of asynchronous stream that can emit values over time. Its simple syntax and familiar coroutine-based structure make it easy to handle asynchronous tasks. Additionally, it offers operators similar to RxKotlin that can transform, combine, and manage data stream events, resulting in clean and easy-to-read code.

Key Differences:

Programming Paradigm:
RxKotlin adheres to the Reactive Programming philosophy, which involves utilizing Observables and operators to combine and modify asynchronous data streams.

Flow is a programming library that uses Kotlin's Coroutines and follows a structured concurrency approach. It allows developers to write asynchronous code using a sequential syntax based on coroutines.

Reactive Programming in RxKotlin:

val messageStream: Observable<Message> = fetchMessages()
.observeOn(AndroidSchedulers.mainThread())
.filter { it.isNew }
.map { it.content }

messageStream.subscribe { message ->
showMessage(message)
}

In this RxKotlin example, fetchMessages() retrieves messages, and a chain of operators filters and processes the data. The observable stream reacts to new messages, updating the UI in real-time.

Now, let’s achieve the same result using Flow’s structured concurrency for a sequential flow of operations:

Structured Concurrency (Flow):

flow {
val messages = fetchMessages()
for (message in messages) {
if (message.isNew) {
emit(message.content)
}
}
}.flowOn(Dispatchers.IO)
.collect { message ->
showMessage(message)
}

In this Flow example, the flow builder establishes a structured sequence of operations. The coroutine-based approach ensures that each message is handled sequentially, maintaining order and structure.

Integration with Coroutines:
RxKotlin can work alongside Kotlin coroutines, but it's a separate library and has its way of handling concurrency and asynchronous operations.

Flow is built on top of Kotlin coroutines and fully integrates with them. It provides a more cohesive experience when working with coroutines and asynchronous operations.

RxKotlin with Coroutines:
RxKotlin provides the asFlow extension function that allows us to seamlessly convert RxJava Observables to Flow, bridging the gap between the two worlds. Here's how we can do it:

import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.schedulers.Schedulers
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import io.reactivex.rxjava3.kotlin.subscribeBy

class MainActivity : AppCompatActivity() {

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)

val rxObservable: Observable<String> = fetchUserData()

val flow: Flow<String> = rxObservable.asFlow()

lifecycleScope.launch {
flow.collect { value ->
println("User: $value")
}
}
}

private fun fetchUserData(): Observable<String> {
return Observable.just("User1", "User2", "User3")
.subscribeOn(Schedulers.io())
}
}

In the above example, we’re using RxKotlin to make a network call and convert the RxJava Observable to a Flow for consumption within a coroutine scope. This scenario simulates fetching data from a remote API using RxKotlin and integrating it with Kotlin coroutines.

Flow with Coroutines:
Flow is naturally aligned with Kotlin coroutines, making it simple to launch and collect values within coroutine scopes. Here’s how we can work with Flow and coroutines:

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class MainActivity : AppCompatActivity() {

override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)

val userDataFlow: Flow<String> = fetchUserDataFlow()

lifecycleScope.launch {
userDataFlow.collect { value ->
println("User: $value")
}
}
}

private fun fetchUserDataFlow(): Flow<String> {
return flow {
emit("User1")
emit("User2")
emit("User3")
}
}
}

In the above example, we’re using Flow directly to perform a network request and collect data within a coroutine scope. This scenario simulates fetching data from a remote API using Flow and Kotlin coroutines.

Syntax and API:
RxKotlin uses a chain of operators to transform and combine Observables. This leads to a more functional programming style.

Flow uses Kotlin's native syntax, which is more sequential and resembles regular imperative programming. This can make Flow code more readable for those familiar with Kotlin.

The chain of operators in RxKotlin

Observable.create<String> { emitter ->
emitter.onNext("ButtonClick")
emitter.onNext("Swipe")
emitter.onNext("ButtonClick")
}.filter { event -> event == "ButtonClick" }
.map { _ -> "User123" }
.mergeWith(anotherObservable)
.subscribe { data -> println("Processed: $data") }

In this RxKotlin example, we create a stream of events, then use the filter operator to keep only "ButtonClick" events. Next, we use the map operator to transform these events into user IDs. Finally, we merge this stream with another observable, and the subscribe method handles the output.

Sequential Syntax in Flow:

flow<String> {
emit("ButtonClick")
emit("Swipe")
emit("ButtonClick")
}.filter { event -> event == "ButtonClick" }
.map { _ -> "User123" }
.collect { data -> println("Processed: $data") }

In this Flow example, we use the flow builder to create a stream of events. Then, we apply the filter and map operations sequentially. Finally, we use the collect method to process and consume the data.

Cancellation Propagation and Back Pressure Handling:
Both RxKotlin and Flow support structured concurrency and automatic cancellation propagation through the chain of operations. However, Flow integrates this more seamlessly due to its tight integration with Kotlin Coroutines.

RxKotlin back pressure management is a built-in feature that helps control the rate at which data is emitted when there's a difference in processing speed between producers and consumers.

Flow handles back pressure differently, providing buffering strategies and flow operators like buffer, conflate, and collectLatest to manage data emission and consumption rates.

Cancellation Propagation and Back Pressure Handling in RxKotlin:

val disposable: Disposable = Observable.create<Int> { emitter ->
for (i in 1..1000) {
if (!emitter.isDisposed) {
emitter.onNext(i)
} else {
return@create
}
}
emitter.onComplete()
}
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{ data -> println("Received: $data") },
{ error -> println("Error: $error") },
{ println("Completed") }
)

In this example, if the subscriber disposes of the disposable before the stream is complete, RxKotlin handles cancellation propagation. However, back pressure isn’t handled directly here, which might lead to potential issues if the data flow is too fast for the consumer.

Cancellation Propagation and Back Pressure Handling in Flow:

val flow: Flow<Int> = flow {
for (i in 1..1000) {
emit(i)
}
}.flowOn(Dispatchers.IO)

val job: Job = flow
.onEach { data -> println("Received: $data") }
.launchIn(GlobalScope)

In this Flow example, cancellation propagation is inherent due to the structured coroutine environment. Flow handles back pressure automatically by default, ensuring that data isn’t overwhelmed for the consumer. The launchIn function takes care of launching the flow within a coroutine scope.

Error Handling
In RxKotlin, errors are handled using the onError callback within the subscriber. This allows us to specify what should happen when an error occurs during the stream.

Flow’s error handling is managed using the catch operator, which allows us to handle errors and emit replacement values or perform recovery operations.

Error Handling in RxKotlin:

Observable.create<String> { emitter ->
try {
if (someCondition) {
throw RuntimeException("Something went wrong!")
}
emitter.onNext("Data")
emitter.onComplete()
} catch (e: Exception) {
emitter.onError(e)
}
}
.subscribe(
{ data -> println("Received: $data") },
{ error -> println("Error: $error") }
)

In this example, the onError callback handles any errors thrown within the stream. Subsequent operators and the subscriber can react accordingly to the error.

Error Handling in Flow:

val flow: Flow<String> = flow {
try {
if (someCondition) {
throw RuntimeException("Something went wrong!")
}
emit("Data")
} catch (e: Exception) {
emit("Error Replacement")
}
}.flowOn(Dispatchers.IO)

flow.collect { data ->
println("Received: $data")
}

In this Flow example, the catch operator catches errors within the flow and emits a replacement value ("Error Replacement" in this case). This ensures that the flow continues emitting values even after an error.

Stay tuned for Part II to learn more about when to choose one over the other based on your project’s requirements, coding style, and familiarity with reactive programming and Kotlin’s coroutines. The decision between RxKotlin and Flow depends on your project’s unique needs and your comfort level with the different paradigms they offer.

--

--

Summit Kumar

Tech-savvy BTech IT professional with a passion for latest technologies. Always seeking new challenges & opportunities to expand my knowledge. #KeepLearning #IT