ChannelFlow Under the Hood

Sahil Thakar
9 min readApr 28, 2024

--

Alrighty Folks,

we’re back and ready to dive into Flow again. This time, we’re tackling a hot topic: where on earth should we use channelFlow?

A lot of people throw out guesses, but rarely do we get straight answers.

Well, we’re going deep to find out what’s really happening under the hood and where channelFlow fits into the big picture. No assumptions, just straight-up code to see what's going on. channelFlow has its own purpose, separate from standard Flow.

so let's get into it! 💻🔍

ChannelFlow combines two key concepts: Channels and Flow. We talk about Flow a lot, but what about Channels? Here’s a quick rundown:

  • Channels were introduced as a communication bridge between coroutines, filling the gap where no direct coroutine-to-coroutine communication existed before. They enable sending and receiving data asynchronously among coroutines, providing a foundation for inter-coroutine messaging.
  • If you want a deep dive into how Channels work under the hood, we could do a whole separate blog on that — just let me know in the comments.
  • But this is a brief understandable statement I have provided.

So, in this blog, we’re going to cover two key things: the channelFlow builder and the ChannelFlow abstract class.

Let’s jump in!

ChannelFlow Builder:-

  • channelFlow is a cold flow, just like a regular flow—it only emits values when a collector starts collecting. But here's the twist: channelFlow uses channels, which are inherently hot. So, how can a hot channel create a cold flow?
  • How can channelFlow be a cold flow if it uses channels, which are inherently hot?
  • Here’s the secret: Internally, channelFlow creates a new channel every time it's collected. The coroutine block in channelFlow sends data through this newly created channel. Even though the channel itself is hot (it retains data even if no one is collecting), the channelFlow re-initializes the channel each time a new collector starts collecting. This setup ensures that every collector gets a unique stream, making channelFlow behave like a cold flow.
  • In short, while the internal channel is hot, channelFlow itself acts like a cold flow because it only starts running when there's an active collector, and stops when there's none. This flexibility is why channelFlow is a cold flow at heart.
  • Let’s dive deeper to see what Kotlin says about channelFlow.
  • Here's what it does under the hood.

If you read the first paragraph, you’ll get a clear understanding that ChannelFlow is a cold flow, not a hot flow.

  • However, when you examine the screenshot, you might be curious about what ProducerScope is. To clear that up, let's take a look at the internal codebase to understand what it does and how it plays into ChannelFlow. Let's dig in! 🔍💻

Explanation:-

  • ProducerScope in Kotlin Coroutines is a specialized interface that combines the features CoroutineScope with SendChannel, allowing coroutines to produce and send data asynchronously to a channel.
  • It is primarily used in coroutine builders like produce, callbackFlow, and channelFlow, where you need to create a coroutine that sends elements to a channel or flow.
  • The ProducerScope provides a reference to the underlying SendChannel, letting you send elements, manage coroutine context, launch child coroutines, and handle cancellation, all within a single scope.
  • This capability is crucial for building concurrent data streams and creating flexible asynchronous workflows in Kotlin.

By examining this, we can see that you can’t achieve this with a regular flow. Normal flow doesn’t allow you to launch new coroutines within it, because flow doesn’t provide a coroutine scope for that. This is one of the key differences with channelFlow—it offers a scope where you can launch additional coroutines while emitting values. Keep this in mind as you explore what sets channelFlow apart from standard flow.

  • Before we go any further, let’s put other examples on hold for now. We’ll dive into the “merge” concept in this blog, so let’s focus on that and leave the rest for later.

ChannelFlow abstract class:-

  • If we dig a bit deeper, we find that ChannelFlowBuilder implements ChannelFlowBuilder.
  • Let's take a look at the code to understand what's going on. Here's what you'll find:

Explanation:-

If you examine the code, you’ll notice that ChannelFlowBuilder inherits from the ChannelFlow abstract class. This abstract class contains some abstract methods and some open methods, which are used by the ChannelFlowBuilder class to construct the channel flow.

Essentially, ChannelFlow is designed as a base class for flows that use channels to handle data, buffer it, and control backpressure. It extends FusibleFlow<T>, which is a marker interface that indicates the flow can be fused with other flows. If you're interested in FusibleFlow<T>, check out our ShareFlow video for more insights.

Now, we know that channelFlow provides a coroutine scope. But what exactly is the purpose of this scope? Let's dive into that next.

Well, Suppose you have multiple flows out there that are independent of each other and you want them to run parallel, Well how can you do that without channelFlow? Just give it a thought then come back.

You could use combine or zip to tackle this problem by merging multiple flows into a single flow. These operators can indeed be useful, but they come with some limitations.

The combine operator waits for all flows to emit their first value, then it uses the latest emitted value from each flow to produce the combined result. The downside is, if one flow hasn't emitted yet, the combined output waits.

With zip, it requires each flow to emit a value at every step. If any flow doesn't emit, the zip operation can't continue, causing delays or halts.

Here, the requirement is to have completely independent flows running in parallel without waiting for each other.

But Father, there are not 2 operators only but there are 3. You forget about “merge. “

My child, The merge operator is the one that helps us solve this problem efficiently because it's built into channelFlow.

What???

But how father?

Hold on, son. Before we dive into merge, let's see how we can tackle this with just the channelFlow builder alone, no merging involved.

Let’s check out the code to understand how it works without merge.


suspend fun flow1() = flow{
var i = 0;
while (true) {
delay(1000)
emit(++i)
}

}

suspend fun flow2() = flow{
var i = 0;
while (true) {
delay(2000)
emit(++i)
}

}

suspend fun mergeFlow() = channelFlow {

launch {
flow1().collect{
send("#Flow1 $it")
}
}

launch {
flow2().collect{
send("#Flow2 $it")
}
}

}

Output:-

#Flow1 1
#Flow2 1
#Flow1 2
#Flow1 3
#Flow2 2
#Flow1 4
#Flow1 5
#Flow2 3
#Flow1 6
#Flow1 7
#Flow2 4
#Flow1 8
#Flow1 9
#Flow2 5
#Flow1 10
#Flow1 11
#Flow2 6
#Flow1 12
#Flow1 13
#Flow2 7
#Flow1 14
#Flow1 15
#Flow2 8
....

Explanation:-

See how straightforward it is? We created a channelFlow, and within it, we started two separate coroutines using launch to collect and send values.

In channelFlow, when you want to emit a value, you use send instead of emit, because send is part of SendChannel, which we discussed earlier.

If you’d like to look inside the code, here’s what you need to understand.

If you look inside the SendChannel interface, you'll find a send suspend function.

Now, Father, I understand how we can achieve this without using merge by employing the channelFlow builder. But can you explain what merge brings to the table?

Absolutely! The third operator, merge, is designed to combine multiple flows into a single flow. It uses the ChannelFlow abstract class under the hood, allowing you to create a flow that pulls from multiple sources. Unlike other operators, merge lets multiple flows run independently and in parallel without waiting for each other to emit. This makes it perfect for scenarios where you need high flexibility and throughput, ensuring that no flow gets held back while others catch up. With merge, you're essentially blending flows without worrying about synchronization issues.

Explanation:-

If you look at the code inside the merge extension function, there's a class called ChannelLimitedFlowMerge.

This class extends the ChannelFlow<T> abstract class and implements the same two methods that the ChannelFlowBuilder class does. However, ChannelLimitedFlowMerge adds one extra override method: produceImpl.

produceImpl() Method

  • This method overrides the base class’s produceImpl() to create a ReceiveChannel<T>, which acts as the communication channel for merging flows.
  • It uses scope.produce() to create a coroutine-based channel with the specified context and capacity. The block parameter is set to collectToFun, which contains the flow merging logic.

collectTo() Method

  • This is where the actual merging of flows happens. The method takes a ProducerScope<T> and collects data into it from the various source flows.
  • It creates a SendingCollector (a class designed to send data to the ProducerScope), then launches separate coroutines for each flow in flows. These coroutines concurrently collect elements from each flow and send them to the scope.
  • This concurrent collection and sending operation is what allows the merging of multiple flows into a single channel, providing a unified stream of data.

Let’s look at the example now:-

fun createFlowWith1000Data(name: String): Flow<Int> = flow {
for (i in 1..100) {
emit(i)
}
}

// Benchmarking the `merge` operator with 3 flows emitting 1000 data points each
suspend fun merge() = runBlocking {
val flow1 = createFlowWith1000Data("Flow1")
val flow2 = createFlowWith1000Data("Flow2")
val flow3 = createFlowWith1000Data("Flow3")

merge(flow1, flow2, flow3).collect {

}
}

Explanation:-

It’s as simple as that — just merge different flows into one and start collecting the values. Easy, clean, and effective!

Note:-
It doesn’t require that all flows emit values of the same data type.

Well, this is not it.

It’s not just merge that relies on ChannelFlow; flatMapMerge and flowOn also use the ChannelFlow<T> abstract class for their implementation. This shows the versatility of ChannelFlow in handling different flow operations.

If we examine the code for flowOn, here's what we find:

ChannelFlowOperatorImpl extends the ChannelFlowOperator<S, T> abstract class, which in turn extends ChannelFlow<T>. This architecture indicates that ChannelFlow<T> is a fundamental building block used in various Flow operations like channelFlow, flatMapMerge, flowOn, and merge.

By examining this internal code, we can understand that it’s possible to create custom flow operators by extending the ChannelFlow<T> abstract class, allowing you to build flows tailored to your specific use case.

So, when people make a big deal about choosing between channelFlow and flow, or are confused about when to use each, there's no need to worry. These are two distinct concepts with different purposes, both have their place in the Kotlin Flow ecosystem.

Yeah, Now that’s it for this class.

Did you guys enjoy it ??

Try it out, and put a question in the comments if you guys have any.

Follow me here on Medium for an interesting topic

Follow me on — Linkedin as well to increase our developer’s network.

Will see you guys soon with some new amazing topics.

--

--

Sahil Thakar

I'm Sahil, an Android developer with 4 years of experience in Java/Kotlin. I've built apps with millions of downloads, like Punch, Woovly and Pratilipi.