Image for post
Image for post

Coroutines and RxJava — An Asynchronicity Comparison (Part 5): Operators

Manuel Vivo
May 2, 2018 · 3 min read


In this blog series I will compare Kotlin Coroutines and RxJava since they are both trying to solve a common problem in Android development: Asynchronous Programming.

In Part 3 and Part 4 we talked about transferring stream of values and the Coroutines and RxJava interop library.

Now it’s time to compare how we can transform those streams with Operators.

Common Operators

Some RxJava operators are available in the Kotlin standard library as part of Kotlin Collections. Here you can see a table comparing simple operators.

Image for post
Image for post

These operators transform the stream in the same way even though some operators differ in name: e.g. skip in RxJava is called drop in Coroutines.

Create Your Own Operator

Some operators are not part of Kotlin Collections. However, you can create them yourself with little effort.

We can replicate what the range RxJava operator does with few lines of code and a simple for loop.

fun range(
context: CoroutineContext,
start: Int,
count: Int
) = publish(context) {
for (x in start until start + count) send(x)

Some other operators require more work. In the example below you can see the implementation of the RxJava operator which takes two blocks of code and waits for both of them to finish.

suspend fun zip(
context: CoroutineContext,
block: () -> Unit,
block2: () -> Unit
) {
val deferred1 = async(context) { block() }
val deferred2 = async(context) { block2() }

If you noticed, we pass a CoroutineContext as a parameter. We do that so we can cancel the operator easily by calling .cancel() on that context’s Job as we talked about in Part 2.

Complex Operators

What about even more complex RxJava operators like debounce?

Image for post
Image for post

You can find debounce as an extension function on ReceiveChannel. RxJava timeout has an equivalent in Kotlin Coroutines with withTimeoutOrNull, etc.

Similarities and Differences

We see that most operators are available in both libraries and if not, you can easily build them.

The only difference I can see in these two libraries is at which point you apply those operators

Whereas in RxJava you can apply operators before subscribing to the stream, you have to do it after opening a subscription in Coroutines. Let’s see how we map an element in RxJava:

.map { result -> map(result) }

And now how we do it in Coroutines:

.map { result -> map(result) }

In Coroutines we have to do it after opening a subscription because map in Coroutines is an extension function on ReceiveChannel<E>. This is the case for other operators such as filter, drop, etc. Calling openSubscription() returns a SubscriptionReceiveChannel<T> object which extends ReceiveChannel<E>.

In my opinion, Coroutines requires extra work if you want multiple Observers to apply the same operators. Of course you can do it! But it requires more code.

What’s Coming Next?

Threading is the topic for the sixth part of this series.

If you want to know how you can swap threading in Coroutines the same way you do it in RxJava with subscribeOn() and observeOn(), don’t miss next week’s blog post!

Capital One Tech

The low down on our high tech from the engineering experts…

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store