Unit testing Kotlin Flow emissions

Seda K
4 min readDec 6, 2023

--

What is Flow

Kotlin Flow is part of the Kotlin Coroutines library and is designed to handle asynchronous programming in an easy and expressive way. More specifically, a Flow is a type that represents a stream of values that are asynchronously produced and consumed.

There are three main entities that are involved when dealing with data streams: Producers, Intermediaries and Consumers. The case is no different when dealing with flows. I can explain this better with an image:

Why Flow?

Flow brings with it many advantages over the traditional way of handling streams of data:

  • Asynchronous stream handling: The most obvious advantage is that it allows you to model asynchronous operations as a sequence of values emitted over time.
  • Integration with Coroutines: Kotlin coroutines are the recommended solution for asynchronous programming in Android. Since Flow easily integrates with coroutines, it is then easier to combine flows with other coroutine-based code.
  • Declarative Syntax: The syntax for creating and working with flows is declarative, making the code more readable and expressive.
  • Cancellation Support: Flow allows you to cancel the execution of a flow when it’s no longer needed. This is an essential tool when managing resources.
  • Transformations and Operators: It provides rich operators for transforming, combining, and performing various operations on the values emitted by the flow.
  • Backpressure Handling: Flows support backpressure, which means it can handle cases where the producer is emitting data faster than the consumer can process it. This helps prevent memory and resource-related problems.
  • Testing Support: Finally, flows were designed to be testable so they can be tested easily without needing any boilerplate or complex setups.

The scenario

Okay, enough talk, let’s get to the code. In this example, I will let the data source (the Producer) be an API that gets the SpotifyAlbums which is set up as follows:

interface SpotifyAlbumsApiService {

@GET("some/json/path")
suspend fun getSpotifyAlbums(): SpotifyAlbums
}

We assume that the relevant dependency injection, networking and TopAlbums model is setup elsewhere. I also have a ViewState as below:

sealed class ViewState {
object Loading: ViewState()
object Error: ViewState()
data class Success(val albums: List<Album>): ViewState()
}

I will create a function to call this service and emit one of these states depending on the result of the call.

class FetchAlbumsFromApi @Inject constructor(private val service: SpotifyAlbumsApiService) {
fun fetchAlbums() = flow {
emit(ViewState.Loading)
try {
emit(ViewState.Success(service.getSpotifyAlbums().albums))
} catch (exception: Exception) {
emit(ViewState.Error)
}
}
}

You’ll notice that another advantage of flows is that they can emit multiple values sequentially, as opposed to suspend functions that return only a single value. You can see this clearly in the example above, it will emit ViewState.Loading initially and then ViewState.Success or ViewState.Error depending on the result of the fetch.

From here, setting up the ViewModel is easy,

class SpotifyAlbumsViewModel @Inject constructor(val fetchAlbumsFromApi: FetchAlbumsFromApi) : ViewModel() {
private val _state = MutableStateFlow<ViewState>(ViewState.Loading)
val state: StateFlow<ViewState> = _state

fun getAlbums() {
viewModelScope.launch {
try {
fetchAlbumsFromApi.fetchAlbums().collect { viewState ->
_state.emit(
when (viewState) {
ViewState.Error -> AlbumsViewState.Error
ViewState.Loading -> AlbumsViewState.Loading
is ViewState.Success -> AlbumsViewState.Success(albums = viewState.albums)
}
)
}
}
catch (e: Exception) {
_state.value = AlbumsViewState.Error
e.printStackTrace()
}
}
}
}

The test

To test FetchAlbumsFromApi we use the Mockk library and an external but recommended library called Turbine. We set up the test with a fake datasource of albums then make these the mocked returned result of the fetch to SpotifyAlbumsApiService. Then we can check that the emissions we expect are actually emitted.

class FetchAlbumsFromApiTest {
@Test
fun `can get albums from api`() = runTest {
// given
val albums = TestDatasource().getAlbums()
val fetchAlbumsService = mockk<SpotifyAlbumsApiService> {
coEvery { getSpotifyAlbums() } returns SpotifyAlbums(albums = albums)
}
val testSubject = FetchAlbumsFromApi(fetchAlbumsService)

// when
testSubject.fetchAlbums().test {
// then
assertEquals(ViewState.Loading, awaitItem())
assertEquals(ViewState.Success(albums.album), awaitItem())
cancelAndIgnoreRemainingEvents()
}
}

The cancelAndIgnoreRemainingEvents() function will then ensure the coroutine is then cancelled and exits the test block. Without it, there may be some unconsumed events coming from this call that we don’t actually care about in the test.

The viewModel can be tested in a similar way:

class AllAlbumsViewModelTest {

@Test
fun `getAlbums can get albums`() = runTest {
// given
val albums = TestDatasource().getAlbums()
val fetchAlbumsService = mockk<SpotifyAlbumsApiService> {
coEvery { getSpotifyAlbums() } returns SpotifyAlbums(albums = albums)
}
val fetchAlbums = FetchAlbumsFromApi(fetchAlbumsService)
val testSubject = SpotifyAlbumsViewModel(fetchAlbumsFromApi = fetchAlbums)

// when
testSubject.getAlbums()

//then
testSubject.state.test {
assertEquals(ViewState.Loading, awaitItem())
assertEquals(ViewState.Success(entries = albums.album), awaitItem())
}
}

In this article, we explored what Kotlin Flows are and some of the advantages they bring to Android development. We also looked at how to use flows in an example using a fake Spotify API and finally how to unit test the emitted values.

If you found this article useful, follow me on Medium or connect with me on LinkedIn.

--

--

Seda K

A software engineer with one foot in mobile development and the other in backend.