How to use Kotlin Flows with Huawei Cloud DB

Yusuf Ceylan
Huawei Developers
Published in
5 min readOct 9, 2020

In this article we will talk about how we can use Kotlin Flows with Huawei Cloud DB.

Since both Kotlin Flows and Huawei Cloud DB is really huge topic we will not cover deeply and just talk about general usage and how we can use the two together.

You can refer this article about Kotlin Flows and this documentation for Cloud DB for more and detail explanation.

Kotlin Flows

A flow is an asynchronous version of a Sequence, a type of collection whose values are lazily produced. Just like a sequence, a flow produces each value on-demand whenever the value is needed, and flows can contain an infinite number of values.

Flows are based on suspending functions and they are completely sequential, while a coroutine is an instance of computation that, like a thread, can run concurrently with the other code.

We can create a flow easily with flow builder and emit data

private fun getData() = flow {
val data = fetchDataFromNetwork()
emit(data)
}

fetchDataFromNetwork is a simple function that simulate network task

private suspend fun fetchDataFromNetwork() : Any {
delay(2000) // Delay
return Any()
}

Flows are cold which means code inside a flow builder does not run until the flow is collected.

GlobalScope.launch {
getData().collect {
LogUtils.d("emitted data: $it")
}
}

Collect flow and see emitted data.

Using flow with one-shot callback is easy but what if we have multi-shot callback? In other words, a specified callback needs to be called multiple times?

private fun getData() = flow {
myAwesomeInterface.addListener{ result ->
emit(result) // NOT ALLOWED
}
}

When we try to call emit we see an error because emit is a suspend function and suspend functions only can be called in a suspend function or a coroutine body.

At this point, Callback flow comes to rescue us. As documentation says

Creates an instance of the cold Flow with elements that are sent to a SendChannel provided to the builder’s block of code via ProducerScope. It allows elements to be produced by code that is running in a different context or concurrently.

Therefore the callback flow offers a synchronized way to do it with the offer option.

private fun getData() = callbackFlow {
myAwesomeInterface.addListener{ result ->
offer(result) // ALLOWED
}
awaitClose{ myAwesomeInterface.removeListener() }
}

The offer() still stands for the same thing. It's just a synchronized way (a non suspending way) for emit() or send()

awaitClose() is called either when a flow consumer cancels the flow collection or when a callback-based API invokes SendChannel.close manually and is typically used to cleanup the resources after the completion, e.g. unregister a callback.

Using awaitClose()is mandatory in order to prevent memory leaks when the flow collection is cancelled, otherwise the callback may keep running even when the flow collector is already completed.

Now we have a idea of how we can use flow with multi-show callback. Lets continue with other topic Huawei Cloud DB.

Huawei Cloud DB

Cloud DB is a device-cloud synergy database product that provides data synergy management capabilities between the device and cloud, unified data models, and various data management APIs.

Cloud DB enables seamless data synchronization between the device and cloud, and supports offline application operations, helping developers quickly develop device-cloud and multi-device synergy applications.

After enable Cloud DB and make initializations, we can start with reading data.

First need a query for getting user data based on given accountId

val query: CloudDBZoneQuery<User> = CloudDBZoneQuery.where(User::class.java).equalTo("accountId", id)

Then we need to execute this query

val queryTask: CloudDBZoneTask<CloudDBZoneSnapshot<User>> = cloudDBZone.executeQuery(query, CloudDBZoneQuery.CloudDBZoneQueryPolicy.POLICY_QUERY_FROM_CLOUD_PRIOR)

While executing a query we have to define query policy which define your priority.

POLICY_QUERY_FROM_CLOUD_PRIOR means that Cloud DB will try to fetch data from cloud if it fails it will give cached data if exist. We can also use POLICY_QUERY_FROM_LOCAL_ONLY or POLICY_QUERY_FROM_CLOUD_ONLY based on our use case.

As the last step, add success and failure callbacks for result.

queryTask
.addOnSuccessListener {
LogUtils.i("queryTask: success")
}
.addOnFailureListener {
LogUtils.e("queryTask: failed")
}

Now let’s combine these methods with callback flow

Resource is a basic sealed class for state management

sealed class Resource<out T> {
class Success<T>(val data: T) : Resource<T>()
class Error(val exception : Exception) : Resource<Nothing>()
object Loading : Resource<Nothing>()
object Empty : Resource<Nothing>()
}

For make it more easy and readable we use liveData builder instead of mutableLiveData.value = newValue in ViewModel

val userData = liveData(Dispatchers.IO) {
getUserData("10").collect {
emit(it)
}
}

In Activity, observe live data and get the result

viewModel.userData.observe(this, Observer {
when(it) {
is Resource.Success -> {
hideProgressDialog()
showUserInfo(it.data)
}
is Resource.Loading -> {
showProgressDialog()
}
is Resource.Error -> {
// show alert
}
is Resource.Empty -> {}
}
})

Just like one shot request above it is possible to listen live data changes with Cloud DB. In order to do that we have to subscribe snapshot.

This callback will be called every time the data is changed.

Let’s combine with callback flow again

From now on we can listen data changes on the cloud and show them on the ui.

Important Note

Since offer() can throw when the channel is closed (channel can close before the block within awaitClose()) we have to call offer more safety way.

@ExperimentalCoroutinesApi
fun <E> SendChannel<E>.offerCatching(element: E): Boolean {
return runCatching { offer(element) }.getOrDefault(false)
}

By doing this we can catch error if offer throws. You can check this issue for get more information

Additional Notes

  • It should be reminded that Cloud DB is still in beta phase but works pretty well.
  • For upsert requests, authentication is mandatory. If authentication is not done, the result of upsert will return false. Huawei offers Account Kit and Auth Service for easy authentication

--

--