Using TestControl to Test FS2 Streams

Farooq Qaiser
5 min readMar 21, 2022

--

Image generated using Microsoft Designer with the prompt “time turner control over time”

Introduction

Recently I was working with an FS2 stream and wanted to write some unit tests to ensure my events were being emitted at the expected rate. I also wanted these tests to be deterministic and fast. Since FS2 is built on top of Cats Effect, I figured I could use the recently released TestControl feature to do this. This is exactly what I’ll be demonstrating in this blogpost. It’s not particularly complicated but since it took me a little time to understand, I figured it deserved a blogpost.

Some quick notes before we get started:

  • The solution I will be sharing in this blogpost is largely inspired by this comment from SystemFW (ps: I highly recommend their blog!) in the FS2 discord channel.
  • Also, this article assumes readers already have some familiarity with Cats Effect and FS2.
  • Lastly, if you’re looking to follow along, you can find all the code we’ll be using in this GitHub repo.

Non-deterministic, Inconsistent, and Slow Tests

Let’s assume we have the following stream:

val helloStream: Stream[IO, String] =
Stream.constant("hello").covary[IO].metered(5.second).take(2)

This code describes a stream that will output “hello” every 5 seconds, for a total of two times before terminating. To make sure this stream behaves as we expect, we’ll want to write a test for it.

To that end, we’ll first try to write a general assertion function which will take in as inputs the Stream[IO, A] that we want to test and our expectation Vector[(FiniteDuration, A)] of what elements should be returned by the stream and at what time. If the results of running the stream match our expectations, the function will return Unit, otherwise it will raise an exception. A first and naive attempt at writing this function might look something like this:

def assertStreamOutputsWithTime[A](
stream: Stream[IO, A],
expected: Vector[(FiniteDuration, A)]
): IO[Unit] = {
val streamWithTime: Stream[IO, (FiniteDuration, A)] =
Stream.eval(IO.monotonic).flatMap { t0 =>
stream.evalMap(IO.monotonic.map(t1 => t1 - t0).tupleRight)
}

val ioProgram: IO[Vector[(FiniteDuration, A)]] =
streamWithTime.compile.toVector

ioProgram.assertEquals(expected)
}

It’s a fairly simple function but let’s break it down bit by bit:

  • First, we create an intermediate stream streamWithTime, which simply transforms every element of stream so that it’s tupled with the time since the start of the stream (t1 — t0).
  • We then compile the stream down into an ioProgram of type IO[Vector[(FiniteDuration, A)]].
  • Finally we assert that ioProgram matches our expected .

Using this assertion function, testing our stream becomes a piece of cake. The following test is checking that our stream returns one “hello” after exactly 5 seconds and another “hello” after exactly 10 seconds:

test("return hello every 5 seconds for 2 iterations") {
val helloStream: Stream[IO, String] =
Stream.constant("hello").covary[IO].metered(5.second).take(2)

val expected = Vector(
5.second -> "hello",
10.second -> "hello"
)

assertStreamOutputsWithTime(helloStream, expected)
}

Now, if we run this test, we will likely observe three things:

  • The test fails. Our events are being emitted ever-so-slightly later than we expect. Here are the results for one test run:
Expected :Vector((5 seconds,hello), (10 seconds,hello))
Actual :Vector((5285798458 nanoseconds,hello), (10265032994 nanoseconds,hello))
  • The time at which our events are being emitted changes on each run. Here’s another failed test run:
Expected :Vector((5 seconds,hello), (10 seconds,hello))
Actual :Vector((5357956494 nanoseconds,hello), (10327396702 nanoseconds,hello))
  • The test takes a long time to run. This might not seem surprising since our helloStream program itself only terminates after 10 seconds but it’s not ideal for a unit test to take this much time.

So what’s going on? First things first, there is nothing wrong with our helloStream program, it actually should just work. The problem is with the way we’re testing. Time is an extremely difficult thing to measure precisely and despite how fast modern day computers are, it still takes some non-negligible amount of time for a computer to do things. As a result, we see that our events are being emitted a fraction of a second later than when we expect them to be.

To work around these problems, we could add some tolerance to our tests (e.g. by rounding to the nearest second) and make the metered rate in our helloStream configurable (so we could set it to something low like 1.second during testing). However, these kind of changes are fairly invasive, in the sense that they complicate our program and test code unnecessarily and distract from the core focus. And even with these changes, your tests might still be a bit flaky. How do we solve this problem then?

TestControl

The solution is to use TestControl. You can read about TestControl in detail in the Cats Effect docs but to summarize, TestControl is a mock runtime for Cats Effect IO which fully implements IO’s functionality, including fibers, asynchronous callbacks, and most importantly for us, time support. In particular, where TestControl differs significantly from the default IORuntime (which is what we’ve been using so far) is:

… within a TestControl universe, IO’s notion of time is artificially controlled and limited to only what the external controller dictates it to be. All three time functions — realTime, monotonic, and sleep — behave consistently within this control system. In other words, if you sleep for 256.millis and measure the current time before and after, you will find the latter measurement will be at least 256.millis beyond the former. The only caveat here is that TestControl cannot simulate the natural passage of time which occurs due to CPU cycles spent on calculations. From the perspective of a TestControl universe, all computation, all side-effects (including things like disk or network), all memory access, all everything except for sleep is instantaneous and does not cause time to advance.

- Cats Effect documentation

This is all great news for testing because this means we will not only get deterministic and consistent results, but also fast results.

So how do we actually use this mock runtime? All we need to do is make one small change to our helper function:

def assertStreamOutputsWithTime[A](
stream: Stream[IO, A],
expected: Vector[(FiniteDuration, A)]
): IO[Unit] = {
val streamWithTime: Stream[IO, (FiniteDuration, A)] =
Stream.eval(IO.monotonic).flatMap { t0 =>
stream.evalMap(IO.monotonic.map(t1 => t1 - t0).tupleRight)
}

val ioProgram: IO[Vector[(FiniteDuration, A)]] =
streamWithTime.compile.toVector

TestControl.executeEmbed(ioProgram).assertEquals(expected)
}

The only thing that’s changed here is the second-last line where, instead of running the ioProgram using the default IORuntime, we are now using the TestControl.executeEmbed method to run it with the mock runtime.

We can leave the rest of our code and the test itself unchanged:

test("return hello every 5 seconds for 2 iterations") {
val helloStream: Stream[IO, String] =
Stream.constant("hello").covary[IO].metered(5.second).take(2)

val expected = Vector(
5.second -> "hello",
10.second -> "hello"
)

assertStreamOutputsWithTime(helloStream, expected)
}

And now, if we were to run our test again, you’ll see the test will pass deterministically, consistently, and almost instantaneously, solving all of our problems!

Parting thoughts

One great piece of advice I got when I was struggling to use TestControl was to just write tests as if you have only the default IORuntime available, and only after you’ve done that, switch to the mock runtime using the TestControl.executeEmbed method just to make the tests deterministic. This is pretty much the same approach we used in this blogpost. And that’s pretty much all I wanted to share in this article.

--

--