ChannelFlow Under the Hood
Alrighty Folks,
we’re back and ready to dive into Flow again. This time, we’re tackling a hot topic: where on earth should we use channelFlow
?
A lot of people throw out guesses, but rarely do we get straight answers.
Well, we’re going deep to find out what’s really happening under the hood and where channelFlow
fits into the big picture. No assumptions, just straight-up code to see what's going on. channelFlow
has its own purpose, separate from standard Flow.
so let's get into it! 💻🔍
ChannelFlow combines two key concepts: Channels and Flow. We talk about Flow a lot, but what about Channels? Here’s a quick rundown:
- Channels were introduced as a communication bridge between coroutines, filling the gap where no direct coroutine-to-coroutine communication existed before. They enable sending and receiving data asynchronously among coroutines, providing a foundation for inter-coroutine messaging.
- If you want a deep dive into how Channels work under the hood, we could do a whole separate blog on that — just let me know in the comments.
- But this is a brief understandable statement I have provided.
So, in this blog, we’re going to cover two key things: the channelFlow
builder and the ChannelFlow
abstract class.
Let’s jump in!
ChannelFlow Builder:-
channelFlow
is a cold flow, just like a regular flow—it only emits values when a collector starts collecting. But here's the twist:channelFlow
uses channels, which are inherently hot. So, how can a hot channel create a cold flow?- How can
channelFlow
be a cold flow if it uses channels, which are inherently hot? - Here’s the secret: Internally,
channelFlow
creates a new channel every time it's collected. The coroutine block inchannelFlow
sends data through this newly created channel. Even though the channel itself is hot (it retains data even if no one is collecting), thechannelFlow
re-initializes the channel each time a new collector starts collecting. This setup ensures that every collector gets a unique stream, makingchannelFlow
behave like a cold flow. - In short, while the internal channel is hot,
channelFlow
itself acts like a cold flow because it only starts running when there's an active collector, and stops when there's none. This flexibility is whychannelFlow
is a cold flow at heart. - Let’s dive deeper to see what Kotlin says about
channelFlow
. - Here's what it does under the hood.
If you read the first paragraph, you’ll get a clear understanding that
ChannelFlow
is a cold flow, not a hot flow.
- However, when you examine the screenshot, you might be curious about what
ProducerScope
is. To clear that up, let's take a look at the internal codebase to understand what it does and how it plays intoChannelFlow
. Let's dig in! 🔍💻
Explanation:-
ProducerScope
in Kotlin Coroutines is a specialized interface that combines the featuresCoroutineScope
withSendChannel
, allowing coroutines to produce and send data asynchronously to a channel.- It is primarily used in coroutine builders like
produce
,callbackFlow
, andchannelFlow
, where you need to create a coroutine that sends elements to a channel or flow. - The
ProducerScope
provides a reference to the underlyingSendChannel
, letting you send elements, manage coroutine context, launch child coroutines, and handle cancellation, all within a single scope. - This capability is crucial for building concurrent data streams and creating flexible asynchronous workflows in Kotlin.
By examining this, we can see that you can’t achieve this with a regular flow. Normal flow doesn’t allow you to launch new coroutines within it, because flow doesn’t provide a coroutine scope for that. This is one of the key differences with
channelFlow
—it offers a scope where you can launch additional coroutines while emitting values. Keep this in mind as you explore what setschannelFlow
apart from standard flow.
- Before we go any further, let’s put other examples on hold for now. We’ll dive into the “merge” concept in this blog, so let’s focus on that and leave the rest for later.
ChannelFlow abstract class:-
- If we dig a bit deeper, we find that
ChannelFlowBuilder
implementsChannelFlowBuilder
. - Let's take a look at the code to understand what's going on. Here's what you'll find:
Explanation:-
If you examine the code, you’ll notice that ChannelFlowBuilder
inherits from the ChannelFlow
abstract class. This abstract class contains some abstract methods and some open methods, which are used by the ChannelFlowBuilder
class to construct the channel flow.
Essentially, ChannelFlow
is designed as a base class for flows that use channels to handle data, buffer it, and control backpressure. It extends FusibleFlow<T>
, which is a marker interface that indicates the flow can be fused with other flows. If you're interested in FusibleFlow<T>
, check out our ShareFlow video for more insights.
Now, we know that channelFlow
provides a coroutine scope. But what exactly is the purpose of this scope? Let's dive into that next.
Well, Suppose you have multiple flows out there that are independent of each other and you want them to run parallel, Well how can you do that without channelFlow? Just give it a thought then come back.
You could use combine
or zip
to tackle this problem by merging multiple flows into a single flow. These operators can indeed be useful, but they come with some limitations.
The combine
operator waits for all flows to emit their first value, then it uses the latest emitted value from each flow to produce the combined result. The downside is, if one flow hasn't emitted yet, the combined output waits.
With zip
, it requires each flow to emit a value at every step. If any flow doesn't emit, the zip operation can't continue, causing delays or halts.
Here, the requirement is to have completely independent flows running in parallel without waiting for each other.
But Father, there are not 2 operators only but there are 3. You forget about “merge. “
My child, The merge
operator is the one that helps us solve this problem efficiently because it's built into channelFlow
.
What???
But how father?
Hold on, son. Before we dive into merge
, let's see how we can tackle this with just the channelFlow
builder alone, no merging involved.
Let’s check out the code to understand how it works without merge
.
suspend fun flow1() = flow{
var i = 0;
while (true) {
delay(1000)
emit(++i)
}
}
suspend fun flow2() = flow{
var i = 0;
while (true) {
delay(2000)
emit(++i)
}
}
suspend fun mergeFlow() = channelFlow {
launch {
flow1().collect{
send("#Flow1 $it")
}
}
launch {
flow2().collect{
send("#Flow2 $it")
}
}
}
Output:-
#Flow1 1
#Flow2 1
#Flow1 2
#Flow1 3
#Flow2 2
#Flow1 4
#Flow1 5
#Flow2 3
#Flow1 6
#Flow1 7
#Flow2 4
#Flow1 8
#Flow1 9
#Flow2 5
#Flow1 10
#Flow1 11
#Flow2 6
#Flow1 12
#Flow1 13
#Flow2 7
#Flow1 14
#Flow1 15
#Flow2 8
....
Explanation:-
See how straightforward it is? We created a channelFlow
, and within it, we started two separate coroutines using launch
to collect and send values.
In channelFlow
, when you want to emit a value, you use send
instead of emit
, because send
is part of SendChannel
, which we discussed earlier.
If you’d like to look inside the code, here’s what you need to understand.
If you look inside the SendChannel
interface, you'll find a send
suspend function.
Now, Father, I understand how we can achieve this without using merge
by employing the channelFlow
builder. But can you explain what merge
brings to the table?
Absolutely! The third operator, merge
, is designed to combine multiple flows into a single flow. It uses the ChannelFlow
abstract class under the hood, allowing you to create a flow that pulls from multiple sources. Unlike other operators, merge
lets multiple flows run independently and in parallel without waiting for each other to emit. This makes it perfect for scenarios where you need high flexibility and throughput, ensuring that no flow gets held back while others catch up. With merge
, you're essentially blending flows without worrying about synchronization issues.
Explanation:-
If you look at the code inside the merge
extension function, there's a class called ChannelLimitedFlowMerge
.
This class extends the ChannelFlow<T>
abstract class and implements the same two methods that the ChannelFlowBuilder
class does. However, ChannelLimitedFlowMerge
adds one extra override method: produceImpl
.
produceImpl()
Method
- This method overrides the base class’s
produceImpl()
to create aReceiveChannel<T>
, which acts as the communication channel for merging flows. - It uses
scope.produce()
to create a coroutine-based channel with the specified context and capacity. Theblock
parameter is set tocollectToFun
, which contains the flow merging logic.
collectTo()
Method
- This is where the actual merging of flows happens. The method takes a
ProducerScope<T>
and collects data into it from the various source flows. - It creates a
SendingCollector
(a class designed to send data to theProducerScope
), then launches separate coroutines for each flow inflows
. These coroutines concurrently collect elements from each flow and send them to thescope
. - This concurrent collection and sending operation is what allows the merging of multiple flows into a single channel, providing a unified stream of data.
Let’s look at the example now:-
fun createFlowWith1000Data(name: String): Flow<Int> = flow {
for (i in 1..100) {
emit(i)
}
}
// Benchmarking the `merge` operator with 3 flows emitting 1000 data points each
suspend fun merge() = runBlocking {
val flow1 = createFlowWith1000Data("Flow1")
val flow2 = createFlowWith1000Data("Flow2")
val flow3 = createFlowWith1000Data("Flow3")
merge(flow1, flow2, flow3).collect {
}
}
Explanation:-
It’s as simple as that — just merge different flows into one and start collecting the values. Easy, clean, and effective!
Note:-
It doesn’t require that all flows emit values of the same data type.
Well, this is not it.
It’s not just merge
that relies on ChannelFlow
; flatMapMerge
and flowOn
also use the ChannelFlow<T>
abstract class for their implementation. This shows the versatility of ChannelFlow
in handling different flow operations.
If we examine the code for flowOn
, here's what we find:
ChannelFlowOperatorImpl
extends the ChannelFlowOperator<S, T>
abstract class, which in turn extends ChannelFlow<T>
. This architecture indicates that ChannelFlow<T>
is a fundamental building block used in various Flow operations like channelFlow
, flatMapMerge
, flowOn
, and merge
.
By examining this internal code, we can understand that it’s possible to create custom flow operators by extending the ChannelFlow<T>
abstract class, allowing you to build flows tailored to your specific use case.
So, when people make a big deal about choosing between channelFlow
and flow
, or are confused about when to use each, there's no need to worry. These are two distinct concepts with different purposes, both have their place in the Kotlin Flow ecosystem.
Yeah, Now that’s it for this class.