How to Use Firebase Realtime Database With Kotlin Coroutine Flow
In this article, we will talk about Kotlin Coroutines, more specifically, about Flow. Probably the best way to understand it is by using a sample, we will explain how to use it with Firebase Realtime Database.
Before start is essential to make a small introduction: as you know Kotlin coroutines is the new Kotlin way to write asynchronous non-blocking code, you can implement it in various ways with different kinds of operators, the most common implementation is to invoke a suspend function and wait for a result, with Flow the game is different, because it allows us to create a data stream and to listen to it by using a FlowCollector. If you are comfortable with RxJava, Flow works like an Observable, and the FlowCollector as an Observer.
Firebase Realtime Database
This is our scenario: we have a realtime database where we store a set of notifications, these notifications are generated server-side, so we don’t have control of them, we can only listen to the source and read the result on our application. As documented on the firebase docs the client implementation is quite simple, but uses a callback interface:
Our goal is to implement it by using Coroutine Flow, but how?
Do it with Flow: the producer
First of all, I suggest you to implement it inside a class with a Repository pattern, to make it easy to implement and also testable.
First of all, define the interface:
This method returns a Flow of Result because we want to handle both outputs: we can produce Result.success() with the list of notification remapped inside a Model (StoredNotification) and a failure by using Result.failure() with the linked Exception.
You can read the full documentation about Kotlin Result here.
Now the most interesting part, the data source implementation:
As you can see the whole Firebase implementation is wrapped inside a callbackFlow(), this operator allows us to emit data from a different concurrency context and make it accessible to a consumer, the flow method to do that is:
Another important part is the awaitClose function: this function is mandatory because the lambda inside it is invoked if the Flow channel is canceled or closed, very useful if we have a specific logic of disposing, in my example I‘m using it to remove the valueEventListener.
Read it: the consumer
Finally, we can define the consumer: it’s very simple to add, in my sample I implemented it inside a ViewModel; it works by using a FlowCollector, the behavior on this component is very similar to a LiveData Observer because we can attach it to the source (in my case to the flowCallback) and add my logic inside the collect() lambda.
FlowCollector is used as an intermediate or a terminal collector of the flow and represents an entity that accepts values emitted by the Flow.
Each time we invoke the sendBlocking() from a Flow it triggers the collector that presents the result, and here we can verify if it’s a success or a failure and change the output on the view. The collect() method is a suspend function, so to use it we need a CoroutineScope, I took the advantage of being inside a ViewModel to use the viewModelScope.
The code is very simple, but it represents the power of this operator, and definitely how it could replace some RXJava operators, but this is only my humble opinion.