Coroutines and RxJava — An Asynchronicity Comparison (Part 3): Transferring Stream of Values

Introduction

In this blog series I will compare Kotlin Coroutines and RxJava since they are both trying to solve a common problem in Android development: Asynchronous Programming.

In Part 1 and Part 2, we learned how to perform heavy computation tasks in the background and how to cancel them.

Part 3 is about transferring stream of values.

If you want to learn more about Channels, Subjects, Actors, BroadcastChannels  Keep reading!

Observables in RxJava

We can use an Observable to transfer streams of values. We can see in the following example how we send five items to an Observer (subscriber to the Observable).

Observable.create<Int> { emitter ->
for (i in 1..5) {
emitter.onNext(i)
}
emitter.onComplete()
}

We just created a source of information that is going to emit five items to the Observer (Subscriber) that subscribes to it.

The way we transfer an element is with emitter.onNext(i). We use emitter.onComplete() to notify the Observer that we’re done emitting items.

Remember, every time a Subscriber calls .subscribe(), the Observable.create code will be executed again. Every Observer will receive different objects.

Observers subscribing at different times to the same Observable

We can see how different observers will get the five different objects plus the completion signal. How can we do the same thing with Coroutines? We have to introduce Channels.

Channels

Channels is how we can transfer streams of values in Coroutines.

A channel is of type E, which is the type of elements that can be transmitted in the channel. Channels can be shared between different Coroutines and by default, they are of capacity 1.

Channels implement the SendChannel and ReceiveChannel interfaces. Let’s take a look at some of their methods:

public interface SendChannel<in E> {
public suspend fun send(element: E)
public fun offer(element: E)
public fun close(cause: Throwable? = null): Boolean
}
public interface ReceiveChannel<out E> {
public suspend fun receive(): E
public fun close(cause: Throwable? = null): Boolean
}

They both expose the method close(cause: Throwable? = null): Boolean which will close the channel.

A channel can be closed. When that happens, you can’t send or receive an element from it.

You can send items to the channel with the method send(element: E) and receive from it with receive(): E. As you can notice, they’re marked with the suspend modifier. That means that those methods are suspending functions: They need to be called inside a Coroutine (that you can create with a CoroutineBuilder as we saw in Part 1 of this series).

What is the difference between blocking and suspending? You can read this part of the Kotlin documentation to learn more about it.

If you try to receive from an empty channel, the Coroutine that is calling that method will suspend execution until there’s an element in the channel. In the same way, if you call send on a channel that is full, the coroutine calling the method will suspend until the channel is not full.

If you want to send an element to the channel and not get suspended if the channel is full, you can use offer(element: E). As you can see, this method is not a suspending function so there is no need to call this method inside a Coroutine.

The caveat with offer is that it doesn’t guarantee you that the element will be added to the channel. It won’t be added if the channel is full.

Let’s show some code! How can we create a channel?

val channel = Channel<Int>()

Sending an Object

We just created a channel of type Int. It will be able to transfer streams of Integers. How can we send a value?

launch {
channel.send(
1)
}

In the example above, we created a Coroutine with the CoroutineBuilder launch and sent an element to the channel. This is the visual representation of it:

Visual representation of a Channel with an element

How can we consume this element? With the receive function.

launch {
val value = channel.receive()
}

We created another Coroutine and consumed the element that was in the channel that we created before.

Visual representation of a Channel with a consumed element

Sending Multiple Objects

What if we want to send two items?

val channel = Channel<Int>()
launch {
channel.send(
1)
channel.send(
2)
}

As we can expect, the Coroutine will start executing the code and the object 1 will be sent to the channel. When it tries to execute the second line, it will suspend until the object 1 is consumed.

Coroutine suspended when Channel capacity is 0

If we create another Coroutine and consume the value, then the first coroutine will resume execution and it will send the second element.

First coroutine has resumed execution and sent the second object to the Channel

Previous RxJava Example with a Channel

How can we implement something similar to the RxJava example we saw at the beginning of the article?

val channel = Channel<Int>()
launch {
for
(i in 1..5) {
channel.send(i)
}
}

That’s a coroutine that will send five elements to the channel with a simple for loop. You can consume those elements in the following way:

launch {
for (value in channel)
{
consumeValue(value)
}
}

This for loop will call channel.receive() implicitly.

What’s happening here? Since the channel is of capacity one…

  • The first coroutine will send the first element (channel.send(1)) and will suspend when trying to execute channel.send(2) until the second coroutine (that was suspended because there were no elements in the channel) resumes execution and consumes the first element.
  • Once there is an element in the channel, the second coroutine will resume execution and consume the value. Then, it will suspend until there’s another element in the channel.
  • Now that the second coroutine has consumed the value, there’s capacity in the channel: the first coroutine will resume again to send the second Integer.
Those two coroutines are going to be resuming and suspending intermittently until the other coroutine comes in and consumes/send the value.

When both coroutines finish executing their suspending lambda, the five objects have been consumed.

Channel with 5 objects consumed

What if we launch another coroutine and want to consume an element from the channel?

launch {
consumeValue(channel.receive())
}

That coroutine will be suspended since there are no elements in the channel.

Coroutine suspended since there are no elements in the channel

Difference with Observables

What’s the difference with RxJava Observables? RxJava Observables are going to execute the code inside Observable.create every time you subscribe to it. That doesn’t happen with Channels.

When you consume an object from the channel, no other coroutine will be able to get the same object

Buffered Channels

As we said before, by default, a channel is of capacity one. You can create buffered channels that will allow senders to send multiple elements before suspending.

val channel = Channel<Int>(3) // Channel of capacity 3

You can specify the capacity as a parameter in the constructor. That is how we can create a channel of capacity three.

Another Way to Transfer Elements: Produce

You can use the CoroutineBuilder produce to transfer stream of elements. produce creates a coroutine with a channel built in.

Produce = Coroutine + Channel

How can we use produce?

val publisher = produce(capacity = 2) {
for (i in 1..5) send(i)
}

What’s the Benefit of Using produce?

Only the code inside produce can send elements to the channel so it prevents other coroutines calling send on that channel. This is because produce only implements the ReceiveChannel<E> interface: It restricts the coroutines that can send elements to the channel. It’s “safer” than creating a channel object.

Produce is a pretty useful way to create custom operators.

You can consume the values in that channel like this:

launch {
publisher.consumeEach {
consumeValue(it)
}
}
The channel is going to be automatically closed when the produce CoroutineBuilder finishes executing its suspending lambda and the elements from the channel have been consumed

Actors

An actor also creates a coroutine with a channel built in. What is the difference with produce? produce implements the ReceiveChannel<E> interface whereas actor implements the SendChannel<E> interface.

Only the coroutine created by the actor can consume the elements from the channel.

You can think of an actor as a mailbox that receives and process elements. Since the code inside a coroutine is executed sequentially, only one element will be processed at a time. How can you create one?

val actor = actor<Int>(CommonPool) {
for (int in channel) {
// iterate over received Integers
}
}

As we said before, this actor is only going to process one Integer at a time. How can we send an element to the actor?

launch {
actor.send(2)

}

Race Conditions in Channels

Until now, we’ve been showing easy examples with only a sender and a consumer that don’t cause us any problems. What happens if there are multiple consumers and a single sender?

val channel = Channel<Int>()
launch {
val value1 = channel.receive()
}
launch {
val value2 = channel.receive()
}
launch {
channel.send(1)
}

With the above example… what variable is going to contain the value 1? value1 from the first coroutine or value2 from the second coroutine? There’s no way we can get a consistent result. A race condition is going to happen.

If you want to send the same object to multiple Observers, you need a BroadcastChannel.

Let’s see the RxJava way to do broadcasting: Subjects.

Subjects

You can do broadcasting in RxJava with Subjects. When you create a subject and send an element, all subscribers will get the same object at the same time.

val subject: PublishSubject<Int> = PublishSubject.create()
subject.subscribe {
consumeValue(it)
}
subject.subscribe {
println(it)
}

We have created a subject and subscribed to it with two different Observers. When we send an element with subject.onNext(1), both observers will get the same object.

How can we do the same with Coroutines?

Broadcast Channels

We can achieve the same behavior with BroadcastChannels. They emit the same object to multiple consumers that listen for the elements using the openSubscription method.

Let’s create a BroadcastChannel.

val channel = BroadcastChannel<Int>(2)

How can we consume the elements sent to the channel? We can use the openSubscription method and the extension function use which is going to handle the subscription for us. The subscription will be closed when the suspending function inside use finishes.

val observer1Job = launch {
channel.openSubscription().use
{ channel ->
for (value in channel) {
consumeValue(value)
}
// subscription will be closed
}
}

Another way we can consume from the channel is with the extension function consumeEach:

val observer2Job = launch {
channel.consumeEach
{ value ->
consumeValue(value)
}
}

ConsumeEach will open a subscription and consume all the elements that the channel emits.

ConflatedBroadcastChannel

Conflated is a special type of capacity. ConflatedBroadcastChannel behaves in a similar way to RxJava BehaviorSubject. What does that mean?

An observer will receive all the elements that the source of information emits after its subscription, but also the last item emitted by the source of information before the observer subscribed.

Let’s take a look at an example:

Resubscribing to a ConflatedBroadcastChannel

Imagine we have a ConflatedBroadcastChannel with an observer that has already received two objects. If that observer closes the subscription, when it resubscribes again to the same channel, it will receive the last object emitted by the channel.

This behavior happens to Observers that resubscribe but also to new Observers. They all are going to get the last item emitted by the Channel if it had already emitted one.

What about RxJava Back-Pressure?

We haven’t talked about back-pressure in the Coroutines section. In RxJava, back-pressure comes when the source of information sends elements faster than the Observer can consume them. How is that handled in Coroutines?

Coroutines handles back-pressure by default.

We don’t have to worry about back-pressure in Coroutines. Since receive and send are suspending functions, when we are in a back-pressure scenario, the sender(s) will suspend until there’s capacity in the channel.

We can say that Coroutines behaves similar to a BackpressureStrategy.BUFFER in RxJava.

Coroutines and RxJava Comparison

We can compare the different terminology we’ve seen in this part of the series.

Coroutines and RxJava Sources of information comparison

We can say that an Observable is cold and the rest of the terms are hot.

Hot sources of information can send information even though there are no observers listening for the events.

An Observable is cold because it’s going to start emitting items only when an Observer subscribes to it.

Observables and Channels are unicast because only one observer is going to receive the object that has been transmitted. However, with a Subject or a BroadcastChannel, you can send the same object to multiple Observers.

Cold Observable Behavior with Coroutines?

How can you get a cold observable behavior with Coroutines? You remember the CoroutineBuilder produce that creates a coroutine with a channel built in?

You can use publish and use it in the same way you use produce.

val publisher = publish {
for (i in 1..5) send(i)
}

The difference is that every time you want to consume the elements from the channel, you’re going to see different objects.

There is one caveat here though, publish is not part of the standard Coroutines library. You will have to import the Coroutines and RxJava Interop library.

Want to learn more about this interop library? You’ll have to wait for Part 4 of this series that will be available next week.


What’s Coming Next?

In the fourth part of this series, we will see how we can use RxJava and Coroutines hand by hand. The Coroutines and RxJava interop library will allow us connect and use them together in the same project.


Like what you read? Give Manuel Vicente Vivo a round of applause.

From a quick cheer to a standing ovation, clap to show how much you enjoyed this story.