Unit Testing Kotlin Flow

Nick Skelton
Google Developer Experts
5 min readFeb 26, 2021

“This Job has not completed yet” is the error message that gives me nightmares while writing unit tests for my Kotlin Flows. This article describes the typical way I have tested Reactive Streams (LiveData and RxJava) in the past and how I tried to adapt these methods to Flow with limited success. It goes on to experiment with the Google recommended method and finally, showcases the best solution I have found so far: Turbine. If you’re just looking for the solution, scroll to the end of the article.

The method I’ve always used for testing LiveData and RxJava streams is similar to siphoning petrol from a car: connect a container to the output, play with the inputs and see what comes out.

Source: https://developer.android.com/kotlin/flow/test

Below is a test rig I’ve been using for quite some time, it’s designed for testing LiveData. The container is a LiveDataTestObserver and the bucket is a MutableList. Setup some mocks, poke the test structure (ViewModel, Repository, Service) and see what the observedValues looks like once it’s done.

This idea works well for RxJava , LiveData and Coroutines/Flow . Each has a global dispatcher which you can configure with a TestRule to keep everything running on a single thread while you are running your unit tests. See InstantTaskExecutorRule for LiveData and here is a similar implementation for RxJava :

Coroutines and Flow come equipped with the same dispatcher mechanism as LiveData and RxJava which can be used in the same way. It allows you to intercept delays() and timeouts() in your code and skip them or advance time to speed up your unit tests. The clever way of ensuring that your code uses a TestDispatcher is to inject a DispatcherProvider into any structures that specifically interact with the Dispatcher. I learned about how to do this from Craig’s post:

And here is a boiled down version:

If you start trawling the internet looking for a solution to “This job has not completed”, you’ll eventually come across this Github Issue in the Kotlin-Coroutines project. As you scroll, you’ll notice that using the above TestCoroutineRule and TestDispatcher solves the problem for some people, but not for others. Why?

Unit testing a Flow is difficult because a Flow doesn’t have to complete. It’s a philosophical core feature of structured concurrency. In fact, some implementations of Flow (ie. Hot Flows) are designed to never complete. If you observe a MutableStateFlow for example, the idea is that it holds its value even when nothing observes it — so it never completes by design. So the reason the TestCoroutineRule works for some and not for others is because everyone implements their Flows differently.

So how do we test these kinds of Flows?

There are some guidelines in the Android Documentation on how to approach unit testing with respect to Flows. It makes a suggestion:

For data streams that require a more complex collection of items or that don’t return a finite number of items, you can use the Flow API to pick and transform items.

It goes on to advise the use of certain operators like first() and take() for these more complex data streams, which will work, for the most part. I believe there is a better way — but the official way is a good start and involves no third party dependencies. Below I will demonstrate how to test using the official Android Documentation suggestion, then re-write the tests using my preferred method for comparison.

Let’s look at some examples, starting with Cold Flows, which are a little easier to test:

Notice that the middle test succeeds, but the implementation of the Flow has ‘changed’. In my eyes, although the code itself is working correctly, the test has failed to detect that the implementation has changed. Imagine that Flow was representing isLoading for example. If it emits an extra value, that’s could mean that the loading indicator is visible instead of invisible. Bad. We’ll come back to this point later.

Let’s take a look at testing some Hot Flows:

In the first test, the test itself hangs… time to hit StackOverflow.

In the second test, we use the TestDispatcher — very clever — but now we have our famous “This Job has not completed yet” error message.

In the third test, first() solves our problem the official Android documentation way. Or so it seems… first() works because it cancels the collection of the Flow. take() doesn’t, so we are back at square one if we want to do anything more interesting than first() with Hot Flows…

Enter Turbine.

Turbine is a nifty little testing library which not only solves these two problems above, but also makes our unit tests more idiomatic. It has a very easy to follow readme, so I won’t repeat its feature set here. What I will do, is refactor our Cold Flow tests from above to use Turbine:

Notice that we not only have much cleaner, idiomatic unit tests, but that we are now catching the implementation change that we previously missed. No more isLoading bug.

Now, let’s do the same for the Hot Flows:

Notice that we aren’t explicitly using the TestDispatcher and that we are being explicit about how the test should end using one of these calls cancelAndConsumeRemainingEvents() or cancelAndIgnoreRemainingEvents() .

To summarise, testing Flows and Coroutines requires a fundamental shift in the way you think about your reactive streams. But if you came here simply looking for an answer to the “Job not completed” error message here it is: use Turbine to Unit Test your Flows.

--

--

Nick Skelton
Google Developer Experts

Freelance Android Dev. Google Developer Expert. Full Time Remote. Part Time Buzzword Hacker.