How to use Kotlin Flows with Huawei Cloud DB
In this article we will talk about how we can use Kotlin Flows with Huawei Cloud DB.
Since both Kotlin Flows and Huawei Cloud DB is really huge topic we will not cover deeply and just talk about general usage and how we can use the two together.
You can refer this article about Kotlin Flows and this documentation for Cloud DB for more and detail explanation.
Kotlin Flows
A flow is an asynchronous version of a Sequence, a type of collection whose values are lazily produced. Just like a sequence, a flow produces each value on-demand whenever the value is needed, and flows can contain an infinite number of values.
Flows are based on suspending functions and they are completely sequential, while a coroutine is an instance of computation that, like a thread, can run concurrently with the other code.
We can create a flow easily with flow builder and emit data
private fun getData() = flow {
val data = fetchDataFromNetwork()
emit(data)
}
fetchDataFromNetwork is a simple function that simulate network task
private suspend fun fetchDataFromNetwork() : Any {
delay(2000) // Delay
return Any()
}
Flows are cold which means code inside a flow builder does not run until the flow is collected.
GlobalScope.launch {
getData().collect {
LogUtils.d("emitted data: $it")
}
}
Collect flow and see emitted data.
Using flow with one-shot callback is easy but what if we have multi-shot callback? In other words, a specified callback needs to be called multiple times?
private fun getData() = flow {
myAwesomeInterface.addListener{ result ->
emit(result) // NOT ALLOWED
}
}
When we try to call emit we see an error because emit is a suspend function and suspend functions only can be called in a suspend function or a coroutine body.
At this point, Callback flow comes to rescue us. As documentation says
Creates an instance of the cold Flow with elements that are sent to a SendChannel provided to the builder’s block of code via ProducerScope. It allows elements to be produced by code that is running in a different context or concurrently.
Therefore the callback flow offers a synchronized way to do it with the offer option.
private fun getData() = callbackFlow {
myAwesomeInterface.addListener{ result ->
offer(result) // ALLOWED
}
awaitClose{ myAwesomeInterface.removeListener() }
}
The offer()
still stands for the same thing. It's just a synchronized way (a non suspending
way) for emit()
or send()
awaitClose()
is called either when a flow consumer cancels the flow collection or when a callback-based API invokes SendChannel.close manually and is typically used to cleanup the resources after the completion, e.g. unregister a callback.
Using awaitClose()
is mandatory in order to prevent memory leaks when the flow collection is cancelled, otherwise the callback may keep running even when the flow collector is already completed.
Now we have a idea of how we can use flow with multi-show callback. Lets continue with other topic Huawei Cloud DB.
Huawei Cloud DB
Cloud DB is a device-cloud synergy database product that provides data synergy management capabilities between the device and cloud, unified data models, and various data management APIs.
Cloud DB enables seamless data synchronization between the device and cloud, and supports offline application operations, helping developers quickly develop device-cloud and multi-device synergy applications.
After enable Cloud DB and make initializations, we can start with reading data.
First need a query for getting user data based on given accountId
val query: CloudDBZoneQuery<User> = CloudDBZoneQuery.where(User::class.java).equalTo("accountId", id)
Then we need to execute this query
val queryTask: CloudDBZoneTask<CloudDBZoneSnapshot<User>> = cloudDBZone.executeQuery(query, CloudDBZoneQuery.CloudDBZoneQueryPolicy.POLICY_QUERY_FROM_CLOUD_PRIOR)
While executing a query we have to define query policy which define your priority.
POLICY_QUERY_FROM_CLOUD_PRIOR
means that Cloud DB will try to fetch data from cloud if it fails it will give cached data if exist. We can also use POLICY_QUERY_FROM_LOCAL_ONLY
or POLICY_QUERY_FROM_CLOUD_ONLY
based on our use case.
As the last step, add success and failure callbacks for result.
queryTask
.addOnSuccessListener {
LogUtils.i("queryTask: success")
}
.addOnFailureListener {
LogUtils.e("queryTask: failed")
}
Now let’s combine these methods with callback flow
Resource
is a basic sealed class for state management
sealed class Resource<out T> {
class Success<T>(val data: T) : Resource<T>()
class Error(val exception : Exception) : Resource<Nothing>()
object Loading : Resource<Nothing>()
object Empty : Resource<Nothing>()
}
For make it more easy and readable we use liveData builder
instead of mutableLiveData.value = newValue
in ViewModel
val userData = liveData(Dispatchers.IO) {
getUserData("10").collect {
emit(it)
}
}
In Activity, observe live data and get the result
viewModel.userData.observe(this, Observer {
when(it) {
is Resource.Success -> {
hideProgressDialog()
showUserInfo(it.data)
}
is Resource.Loading -> {
showProgressDialog()
}
is Resource.Error -> {
// show alert
}
is Resource.Empty -> {}
}
})
Just like one shot request above it is possible to listen live data changes with Cloud DB. In order to do that we have to subscribe snapshot.
This callback will be called every time the data is changed.
Let’s combine with callback flow again
From now on we can listen data changes on the cloud and show them on the ui.
Important Note
Since offer()
can throw when the channel is closed (channel can close before the block within awaitClose()
) we have to call offer
more safety way.
@ExperimentalCoroutinesApi
fun <E> SendChannel<E>.offerCatching(element: E): Boolean {
return runCatching { offer(element) }.getOrDefault(false)
}
By doing this we can catch error if offer
throws. You can check this issue for get more information
Additional Notes
- It should be reminded that Cloud DB is still in beta phase but works pretty well.
- For upsert requests, authentication is mandatory. If authentication is not done, the result of upsert will return false. Huawei offers Account Kit and Auth Service for easy authentication
In this article we talked about how can we use Kotlin Flows with Huawei Cloud DB