Aspects of functional effects execution in Scala runtimes

Nikita Kalinskiy
IT’s Tinkoff
Published in
14 min readJun 5, 2023

Hello! My name is Nikita Kalinskiy, I am a developer at Tinkoff Business. Right now I’m working on a product called “Operations Feed” that aggregates all operations by company and allows users to search through them, get a list or a statistics of operations.

I want to talk about the basics of the different effect execution systems in Scala. We’ll break down how effect systems work, how they are implemented in Scala in Cats Effects and ZIO, and how they have evolved. And we’ll also have a look at the implicit features and pitfalls of the execution environments of such libraries.

This article is based on my talk from a Scala meetup that I gave in 2022. Since then, Typelevel team has written their own documentation about Cats Effect runtime internals. So, my article is more of a summary and compilation of it and my own experience with a comparison of Cats Effect and ZIO runtimes.

How asynchronous runtime environments work

Suppose a project manager asked you to develop another web server that can handle several hundred RPS, process user requests and access a database.

We decide to use the standard thread model, where each request is received, processed, and the result is returned by a dedicated thread.

Standard thread model (source)

There is a problem with this approach known in the community as C10k. If you want to solve it, you need to configure the server to not only handle thousands of requests, but to do so efficiently. Thinking this way, creating a separate thread for each request is not a good idea. First, it can exhaust the resources of the operating system, and second, it is an expensive operation requiring kernel calls.

As a workaround, we can introduce a scheduler, the so-called thread pool. Each request will not come directly to the thread, but to the scheduler. The scheduler will distribute tasks among threads using the queue. The number of threads will be limited and they will be initialized in advance (in most cases).

Thread pool in JVM
Thread model with pool (source)

However, since the number of threads is limited, it is possible that they will all be blocked at some point. For example, if we use a blocking driver to access the database, all threads will have to wait for a response from it. Also, context switching between threads is another expensive operation. So in our case, the asynchronous model on threads with scheduler only solves part of the problem.

Help comes unexpectedly from Rust. The Tokio asynchronous programming framework in Rust has been written with the idea to solve the above problems. The basic idea of the framework is that we introduce an additional abstraction instead of a thread — Fiber. It’s a lightweight (or so called “green”) thread that can be created easily and quickly. Due to the small memory footprint, you can create as many of them as you want, the only limitation is the memory of your service.

Each fiber encapsulates the execution of a single task. In our case, it executes the logic of the request. At the same time, fibers can cooperatively yield up threads, i.e. each of them has a time limit for execution, after which it voluntarily releases the thread. In addition, each fiber is only semantically blocked — when it performs some asynchronous operation and is waiting for a result, it does not block the thread, but simply switches itself to the appropriate state and voluntarily gives up the thread to be used by another fiber. Unless, of course, a blocking operation such as a JDBC call is performed: then the thread is blocked waiting for a response no matter what thread model is used.

Now our scheduler gets not just requests, but requests wrapped in fibers. And it is the fiber itself that gets executed on thread.

Thread model with fibers (source)

A bit about contexts

Often in a production environment, not only do you need to perform tasks asynchronously, but you also need to be able to assign some context to the request to identify it in logs or distributed tracing. For example, a user or request ID. If we were working without fibers and just used threads, ThreadLocalwould come to the rescue. It allows you to store some values in a particular thread — custom values for each one. If we want to spawn another thread inside the first one, we should pass the context to the child as well.

Although we have the InheritableThreadLocalclass (in JVM) for this purpose, here we may encounter some other problems: since we are using the scheduler, we have a limited number of threads that will be reused. As a result, each ThreadLocalvalues must be cleared after the request is processed, otherwise we can get a mess of contexts. If we use multiple thread pools, we also need to be able to pass the context between them since each pool runs tasks on its own threads. These are hacks we want to avoid.

Fiber also has a context where we can store the values we need. But when a fiber creates another “child” fiber within itself, the context is automatically saved in the context of the “child”. And since fibers are not reused, they are deleted by the garbage collector when the tasks are done with their contexts. So you don’t have to clean anything up, and there’s no mixing of contexts.

What is Runtime in effects system

Runtime allows you to run a fiber that carries within it an “effect” describing some kind of computation. To achieve this it needs:

  1. An Executor, that is, a Main ThreadPool on which main CPU bound computations will be performed.
  2. Additional thread pools for blocking or delayed calculations (optional).

This is roughly what the main loop of any effect system looks like. When you start the program, it is initialized: e.g. all configs are read and connections to the database are initiated. All this is wrapped into a main fiber and sent to the thread pool. There the first thread grabs this fiber from the queue and executes everything wrapped in it.

This is what an abstract fiber usually looks like:

  1. The first thing we see is the context, within which the FiberId is stored, and the Scope to which the fiber is bound. The latter can be global, if the task runs periodically. It can also be the Scope of the parent, if the fiber is a subtask inside another fiber. And an associative array with context values.
  2. We also see a computation stack, which is filled as the task wrapped in this fiber runs. IO in this case is a lazily evaluated computation (effect) of the request.
  3. And we see the main run function that knows how to run our lazy IO calculations. It most often has a while loop that iterates to a given limit, after which the fiber voluntarily gives up the thread. Inside this loop, pattern matching by effect type takes place, the call stack is filled, and the computation unfolds.

We’re done with the theoretical part, let’s move on to looking at the libraries.

ZIO

Runtime

The snippet below shows a sample ZIO 2 runtime. It is parameterized by type R that is used in ZEnvironment. This environment contains everything needed for dependency injection. There are fiberRefs, a collection of fiberRefs (initially empty) that will hold all references to fibers’ contexts and metadata. And a set of runtime flags. We also see a “run” function, which allows us to take the effect description in ZIO and run it synchronously.

Under the hood, all fibers are run on a separate thread pool for CPU bound tasks. In addition, there is a thread pool inside the ScheduledExecutorService, which is needed to run a delayed effect based on some schedule.

ZIO 1 had a thread pool with a fixed size equal to the number of available processors multiplied by two for CPU bound tasks. This pool encapsulated a blocking queue in which all fibers for all threads were submitted.

In ZIO 2 the framework creators implemented their own ThreadPool, which knows how to exchange fibers between threads (aka work stealing) if some of them do not have their own tasks to run. There is one global queue, in which fibers from outside are placed (for example from http server backend), and local queues for each of the threads.

After creating a fiber, the thread puts it in its local queue, and then periodically checks the global queue and takes another batch from there, so that there is a fair distribution of global work. Having a local queue for each thread ensures that the thread does not compete with other threads for tasks, and makes better use of CPU caches. In addition, when a thread has fibers in its internal queue, it alerts the others and offers to take some if their queues are empty.

WorkStealing Thread model with fibers (source)

Working with fibers

Now let’s figure out how to create fibers in ZIO. Let’s imagine we have two calculations. They return numbers, and we want to run them in parallel to get the results and add them up. To do this, we need to call fork on each of them. This will return a delayed fiber wrapped in the effect. To run this fiber, you need to compose it via flatMap with the main computation flow. To wait for the result of calculating the fiber, you need to call join on it. At that point, semantic blocking will happen: first waiting for result1, then for result2. Also, a fiber can be canceled via interrupt call.

But I do not recommend using fiber interfaces directly. If, for example, after creating it via fork, an error occurs in the main computation, the fiber will continue to run until its own task is finished. Eventually Garbage Collector will clean it up of course, but CPU cycles will be wasted on irrelevant computations. (second snippet)

Therefore, it is better to use safe functions like zipPar. It allows to “glue” two calculations with error handling, that is, in case of a problem, all fibers will be canceled by the runtime. (third snippet)

Another interesting feature of ZIO’s implementation of fibers is the Fork-Join Identity principle: if you create a new fiber and immediately semantically block it while waiting for the result, this should be logically equal to no launching of the fiber at all.

Consequently, if the main computation is canceled, all fibers launched from it must also be canceled, so as not to violate this feature.

If we want to work around this, we can run fibers on a separate Scope. Or create a calculation that runs at regular intervals using delay + forever. In this case, the fibers will be bound to the global Scope of the program itself and will end only when it ends.

Fiber Run loop

The fiber’s Run Loop works like this: there is a limit of 2048 iterations, after which the thread is voluntarily released. Inside there is a pattern-matching to determine the next action, but there is a small performance optimization: pattern-matching is not by type of effect, but by its integer tag.

If an error is detected in the course of calculations, the entire computation stack of the fiber is unfolded in an attempt to find a handler for the error. If it can’t be found, the error is sent to Main Loop and can cause the program to stop. In addition, it is possible to explicitly release a thread (Yield command) or move the computation to another thread pool ( Shift), for example for blocking operations.

Blocking operations

Another peculiarity concerns blocking operations. Ideologically, they should always be run on a separate thread pool so that they do not occupy threads from the main thread pool. Moreover, they can be canceled (at least tried), and they are guaranteed to be executed on the thread pool they were launched on, even if an implicit thread pool switch was made or an asynchronous boundary was passed somewhere inside them.

To create a blocking computation, we can use effectBlocking (uninterruptable) or effectBlockingInterrupt, which under the hood will call Thread.interrupt when the fiber is canceled. Or we can define our own effect that will be executed when the fiber is interrupted. In the example below, starting the calculation opens a socket that is guaranteed to close when the calculation is canceled.

In addition, ZIO 2 has a special monitoring system that analyzes running fibers to see if they are involved in blocking operations. If it detects them, it automatically transfers them to the blocking thread pool, even if the user has not explicitly called effectBlocking. But, as is usually the case with systems like this, especially just after their release, don’t assume that it will work in all cases. So try to always use an explicit switch to the blocking thread pool if you are sure that the described operation is blocking.

Quick ZIO recap

Let’s recap the ZIO runtime. Let’s start with the pros:

  • Fibers with semantic blocking.
  • Fibers can be interrupted.
  • They use cooperative yielding to guarantee more fair concurrency.
  • Fork-join identity + ability to attach a fiber to a specific scope.
  • Run loop matches effects not by full type but by their tags.
  • Blocking operations are guaranteed to run on a separate executor.
  • Cancellation of blocking effects.

Cons that were present in ZIO 1 runtime but not in ZIO 2:

  • A single global task queue in a main thread pool can be a source of performance degradation.
  • Suboptimal use of local CPU caches.

Cats Effect 2 & 3

Runtime

In Cats Effect 2 runtime looked a little different. There were:

  • A run function that describes the whole program.
  • main, which knows how to run the program.
  • ContextShift — an abstraction that allows us to switch between different thread pools.
  • Timer — a wrapper over the ScheduledExecutorService.
  • ExecutionContext — the default threadpool for CPU bound tasks.

Under the hood was a pool with a fixed size. Its maximum size was the available number of processor cores, but not less than two. With such a pool there were the same performance problems as with ZIO 1.

In Cats Effect 3 runtime was changed and became similar to ZIO 2: with one global queue, local queues for each thread and possibility to “steal” some tasks from local queues by other threads.

As a result, the IOApp itself changed:

  • All of the treadpools for regular, blocking, and scheduled computations went into a separate IORuntime structure.
  • The ContextShift functionality has been moved to Async in CE3. It carries a reference to the main CPU bound thread pool, and allows to perform the calculation F[A] on an explicitly defined other executor. After execution of F[A] computation is guaranteed to continue on the main executor.

Working with fibers

Fibers in Cats Effect are started in much the same way as in ZIO, but via the start function. You can cancel them by calling cancel on this fiber. However, Fork-Join Identity was absent in CE2 — it was added in CE3. CE also has functions for a safer composition of fibers, such as parMap. But it is a bit more difficult to use, because they are not static IO functions, like in ZIO, but syntactic extensions over the Parallel typeclass. You have to import them explicitly from cats.syntax or cats.implicits objects.

Fiber Run Loop

The Run Loop of fiber also worked differently in CE2. It ran until all of the computation was done and the fiber had finished completely. The developers argued in favor of such a decision since they wanted to avoid the problem of increased costs of putting the fibers back into the global queue, and to make better use of the local CPU caches. Pattern-matching inside was done by effect type rather than by tag, and it was also a bit more “expensive” than in ZIO.

In CE3, the run loop has also become very similar to ZIO, since there is a limit of 1024 iterations per fiber and pattern-matching by effect tag instead of checking for type.

Blocking operations

Blocking operations are performed in the same way on a separate thread pool. To switch to a blocking thread pool, CE2 had a Blocker wrapper. One of its main disadvantages was the absence of a guarantee that the computation would always run on the blocking thread pool. This mainly manifested itself in the presence of an asynchronous boundary or implicit shift to another thread pool within that computation.

For example, in the code below there is a blocking read from a file, but in between the user decides to change the read thread to log the line, so he doesn’t have to do it on the blocking one. As a result, the user will continue reading the file after the log, but not on the blocking treadpool. Although this is a degenerate example, imagine that in a program you pass IO to blockOn, which you get as a result of calling some other function. You can’t be sure that somewhere inside that IO there isn’t a shift being called. And this strongly violates the principle of local reasoning of what the program does.

This is an annoying feature that should have been kept in mind when using CE2. In addition, the blocking effect could not be canceled: you could only wait for it to end, or simply forget about it.

In CE3 they decided to simplify the hierarchy of the typeclasses with respect to blocking operations. In the end, Blocker functionality has been moved to Sync, and fibers with blocking operations can now be canceled. Different cancelation strategies can be chosen during blocking IO creation:

  • Sync[IO].blocking— perform an uninterrupted calculation on the blocking thread pool.
  • Sync[IO].interruptible— perform a calculation and try to cancel a fiber once by sending Thread.interrupt to it. If it doesn’t cancel, just continue executing the program.
  • Sync[IO].interruptibleMany— try to cancel several times until it succeeds.

Runtime changes in upcoming releases

ZIO developers want to support Project Loom — a native fiber-based runtime implementation in the JVM (in the near future). One of the main goals of Loom support in ZIO, besides native JVM fibers of course, is to simplify Loom transition for ZIO users. That is, the creators of ZIO want to provide the most seamless and smooth transition for everyone who wants it.

Cats Effect, on the other hand, in the 3.5.0 release, should get rid of the separate ScheduledExecutorService to perform scheduled operations. They want to move the work completely to the main executor: each thread will periodically, or when there are no tasks in its local queue, check if the timers of the scheduled operations initialized on this thread have expired. If they have, the thread will execute them, otherwise it will be “parked” until the next timer expires. This will give a performance gain of 15–20%, because the runtime won’t have to transfer calculations back to the main thread pool after the timer expires. This is especially relevant, for example, for http servers handling a large number of concurrent requests, since when a new connection is opened, a timeout is set on it for subsequent closing.

Conclusion

That’s all, folks! If your are choosing between CE2 or ZIO 1, it is better to choose the latter. With it you will gain in performance, besides, you will avoid problems with non-optimal execution of fibers and various implicit switching between thread pools.

But if you have to choose between CE3 and ZIO 2, you should base your choice solely on what you find more convenient to use. There are practically no dramatic differences in asynchronous effects execution model and performance of these libraries.

If you want to know more about the difference between the libraries, I suggest a good (more or less honest) benchmark from the creator of Cats Effect.

References

--

--