Throttle operator with LiveData and Kotlin coroutines

Guilherme Alexandre Sant'Ana
2 min readDec 17, 2018

Recently I had a need of having an Observable which should emit an item if a particular timespan has passed without emitting another item with a different value.

My first thought was using RXJava to have the throttle operator.

The RXJava Throttle operator

"The Sample operator periodically looks at an Observable and emits whichever item it has most recently emitted since the previous sampling."

see http://reactivex.io/documentation/operators/sample.html

Even though this solution is perfect, it wouldn't be a good idea to add such a complex library only to use one operator right? So I've got to this solution.

class ThrottleLiveData<T> constructor(val offset: Long, 
val coroutineContextProvider: CoroutineContextProvider) : MutableLiveData<T>() {

var tempValue: T? = null
var currentJob
: Job? = null

override fun
postValue(value: T) {
//Verify if the value is different to the last one
if (tempValue == null || tempValue != value) {
//cancel the last unfinished job
currentJob?.cancel()
//launch a coroutine on background
currentJob = launch(coroutineContextProvider.IO) {
//wait the offset time, if another item is emitted it will be cancelled before the delay time ends
delay
(offset)
//if the job was not cancelled it will emit the last item on the UI Thread
withContext
(coroutineContextProvider.Main) {
tempValue
= value
super.postValue(value)
}
}
}
}
}
open class CoroutineContextProvider @Inject constructor() {
open val Main: CoroutineContext by lazy { UI }
open val IO
: CoroutineContext by lazy { Dispatchers.Default }
}

Testing the solution

class TestCoroutineContextProvider : CoroutineContextProvider() {    override val Main: CoroutineContext
get() = Dispatchers.Unconfined
override val IO
: CoroutineContext
get() = Dispatchers.Unconfined
}
class TestThrottleLiveData {

@Rule
internal var rule = InstantTaskExecutorRule()

@Mock
lateinit var observer: Observer<Int>

@Test
fun testThrottle() {

val throttleLiveData = ThrottleLiveData<Int>(500, TestCoroutineContextProvider())
throttleLiveData.observeForever(observer)
throttleLiveData.postValue(1)
Thread.sleep(100)

//as the offset is 500 the value should be null at this point
Assert.assertNull(throttleLiveData.value)
throttleLiveData.postValue(2)
Thread.sleep(600)

//now the value should be 2 and the observer should be notified
Assert.assertEquals(2, throttleLiveData.value)
Mockito.verify(observer).onChanged(2)

}
}

Pretty simple right?

If you have any suggestion please put it in the comments.

If you wanna know how to implement other operators I suggest this post https://proandroiddev.com/i-exchanged-rxjava-for-coroutines-in-my-android-application-why-you-probably-should-do-the-same-5526dfb38d0e

--

--