Multiprocessing on the JVM & Scala: understanding the benefits of cats-effect
CPU historically scaled in clock but plateaued few years back around 3–4 Ghz. Instead, the number of cores in CPUs skyrocketed and multiprocessing is becoming more and more predominant in software programming. There is a lot of innovation in this field, especially in the Scala ecosystem.
What is a multiprocessing system ?
We distinguish two types of multiprocessing systems: preemptive and cooperative.
A preemptive system makes sure every process has a fair share of processing time by interrupting the processes to schedule the other ones. As opposed to a cooperative system, in which the processes are in charge to yielding back themselves the control to the system.
In practice a preemptive system is considered fair because all the processes will run eventually. In contrast, a cooperative system is considered fast because the processes can optimally yield back the control to the scheduler.
However, in a cooperative system, a process might hog all the ressources by not giving the control back to the scheduling system.
It can be summed up by this simple formula:
Preemptive = FAIRNESS ➕ AVG PERF (FAIR)➕ CONTROL
Cooperative = PERFORMANCE ➕ CONTROL ➕ DANGER
Multiprocessing for Java & the JVM
The multithreading primitive in Java is the Thread
. Creating a thread spawns an OS thread and it is expensive: memory has to be allocated, a system call has to be made and the JVM has to readjust internally to register this new thread.
In Java and the JVM, creating too many threads will lead to degraded performance due to context switching. It occurs when the JVM pauses a thread to give some computing time to another thread and to do so, the JVM has to save the local data of the running Thread
to be able to continue its processing later on. This process is very expensive.
Luckily, thread pools are here to help with this problem. The idea behind thread pools is to create the threads once in order to have a controlled allocation cost and to reuse them as we go.
In java, threadpools are quite common and they are materialised by the “framework” Executor
+ ExecutorService
+ThreadPoolExecutor
. Here is an excerpt of the javadoc
Thread pools address two different problems: they usually provide improved performance when executing large numbers of asynchronous tasks, due to reduced per-task invocation overhead, and they provide a means of bounding and managing the resources, including threads, consumed when executing a collection of tasks.
There are several types of thread pools builtin the standard java lib such as cached thread pools, fixed thread pools, scheduled thread pools to name few. Cached thread pools are used for short-lived tasks. Fixed thread pools are good for CPU intensive tasks (it is a scarce ressource). Scheduled thread pools are useful for time sensitive tasks, such as recurring tasks.
Scheduling a task in the threadpool is as simple as calling the method execute
on the ThreadPoolExecutor
:
public void execute(Runnable command)interface Runnable {
public abstract void run();
}
The interface is pretty straightforward and multiple calls allows us to run several computations in parallel. One caveat of this is that it’s not possible to define any continuation in an elegant and composable fashion. This is where Scala futures kicks-in and shine.
The scala stdlib implementation: Future
Scala comes with its own multiprocessing abstraction & implementation: Future
. From the official documentation
Futures provide a way to reason about performing many operations in parallel– in an efficient and non-blocking way. A
Future
is a placeholder object for a value that may not yet exist
Futures are backed by an execution context, itself backed by traditional java thread pools. Schematically it would look like this:
With futures it is possible to launch parallel computations in a non blocking fashion and define continuations with map
, flatMap
, etc…, to bind callbacks. Callbacks are eventually executed.
Creating a Future
is an eager operation, it means it is immediately sent to an execution context to be executed. On each future creation or execution of map
, flatMap
, etc… a new task is being sent to an execution context to be scheduled. It even transpires in its very own interface:
//When a future is created
def apply[T](body: =>T)(implicit executor: ExecutionContext)def map[S](f: T => S)(implicit executor: ExecutionContext): Future[S]def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S]
In this code snippet we can see that an ExecutionContext
is needed for each of those operations. It allows the developer to control on which ExecutionContext
the computation must be running. It is quite convenient because you can have a fine grained control over which task should be sent to which threadpool. On the other hand, you always need to have an implicitExecutionContext
in the scope which creates some visual clutter.
Creating a Future
is straightforward, we can use the apply
method:
Future { println("1") }
Future { println("2") }
Future { println("3") }
Future { println("4") }
In this example, we create four parallel tasks to be executed. So we are not guaranteed of the order of execution. It might print “1, 2, 3, 4” or “2,1, 4, 3” etc… if we want those futures to be executed sequentially, we have to use the flatMap
operator:
Future { println("1") }
.flatMap(_ => Future { println("2") })
.flatMap(_ => Future { println("3") })
.flatMap(_ => Future { println("4") })//or with nice for-comprehensionfor {
_ <- Future { println("1") }
_ <- Future { println("2") }
_ <- Future { println("3") }
_ <- Future { println("4") }
} yield ()
If we have a list, we can also execute a multiple Future
:
//print each number of the listList(1, 2, 3, 4).map(num => Future(println(num))).sequence// or more elegantly Future.traverse(List(1, 2, 3 , 4))(num => Future(println(num)))
We can also return the first future completed. It might be useful when you need to have a result as soon as possible from multiple sources. For instance:
Future.firstCompletedOf {
List(
getPriceFromApiA(),
getPriceFromApiB(),
getPriceFromApiC(),
getPriceFromApiD(),
)
}
Note: Future
does not implement cancellation. It means that even if the first result is returned, let’s say from api C, futures A, B, D will continue their execution. It is wasteful because in this case we might parse some JSON for results A, B and D even if their results are going to be discarded.
At this point, you might be wondering if futures fit the preemptive or cooperative model. Well, it’s quite a tricky question. Strictly speaking, and by definition aFuture
fits the cooperative model since it’s up to the developer to give back the control to the system (at the end of a Future
block). So we are at risk of hogging all the resources. However in practice it can be considered preemptive because the idiomatic ways of using Future
is to call a lot of map
and flatMap
to chain the computations and to make the code more readable.
Example of non-idiomatic code:
Future {
val users: List[User] = getUsersFromDb()
val filteredUsers: List[User] = users.filter(_.role == ADMIN)
val results: List[NotificationResult] =
filteredUsers.forEach(adminUser => sendNotification(adminUser))
val savedResult = saveResultsInDb(results)
savedResult
}
In this code snippet, many things are being done: we retrieve users from the database, we search for the admin users, we send them a notification one by one and finally save the result in the database. This kind of code is not so idiomatic and kind of dangerous in the sense that it does not yield back the control to the scheduling system until the very end. Instead, you might want to do it this way (idiomatic):
getUsersFromDB()
.map(users => users.filter(_.role == ADMIN))
.flatMap { adminUsers =>
Future.traverse(adminUsers)(admin => sendNotification(admin))
}
.flatMap(results => saveResultsInDb(result))
In this rewritten example, it assumes that getUsersFromDb()
, sendNotification()
, saveResulstInDb()
are themselves futures. This code is way more “fair” because for every step we take, a Future
is being sent to the ExecutionContext
to be eventually executed. Simply put, whenever is Future
is created (and sent to an execution context), you are leaving the opportunity for other tasks to be executed. In this example, you also have the chance to decide where each Future
will be executed. So for instance if sendNotifications()
is networking intensive, but does require a lot of CPU, it could be executed in a dedicated threadpool, in order not not waste the scarse CPU time.
Note: we need to keep in mind that Future
does not make our code any faster. In fact, the non idiomatic code will be faster than the idiomatic Future code because we pay a small price for sending a Future to a threadpool (known as the trampoline effect). However, we do a better use of our resources, since more tasks can run in parallel.
To sum up, Future
can be considered quite fair when written in the idiomatic style. It offers a well rounded API that works for most cases. It lacks some important features such as cancelation. Its eager nature might bite you off from times to times as well. Performance wise, it will be more than enough for most use cases, however in intensive uses, it might really suffer from the trampoline effect. This is where cats-effect come into play.
Cats-effect & IO
cats-effect is a fully fledged runtime platform to execute parallel and concurrent computations. It is a very specialised piece of software that has one goal: maximise resource utilisation by being smart about scheduling computations. It also gives a lot of power to the developer with its cooperative scheduling system, and with power comes responsibilities.
In cats-effect, the equivalent of Future
is IO
. So what is the difference with Future
and what is special about it ?
IO is lazy, Future is eager
As opposed to Future
creating an IO
does not trigger any computation as IO
is just a value. It describes a chain of computations that has yet to be executed. So creating a IO
does not send any task in a threadpool. Think of it as a case class that encapsulates a computation to be run eventually (and can be run any number of times). When it has to be run is left to the runtime to be decided (in the most optimal fashion). Basic operations of IO
include:
def apply[A](f: => A): IO[A]def map[B](f: A => B): IO[B]def flatMap[B](f: A => IO[B]): IO[B]
Manipulating IO
means we are manipulating values while manipulating Future
means we are manipulating already running computations (or at least already enqueued to be processed). For instance
val printIO = IO(println("Hello")).flatMap(_ => IO(println(" io")))val printFuture =
Future(println("Hello")).flatMap(_ => Future(println(" future"))
printFuture
is already running and will complete once; while printIO
is the description of a computation that has yet to be run, and can be run any number of times, so the following computations have different outcome:
val resultFuture = for {
firstHelloFuture <- printFuture
_ <- printFuture
} yield ()//will print once "Hello future" when runval resultIO = for {
firstHelloIo <- printIO
secondHelloIo <- printIO
} yield ()//will print twice "Hello io" when run
IO supports cancellation
Now, imagine you are building a system that needs to retrieve prices as fast as possible from multiple sources, to keep only the fastest answer and discard the other ones. The built-in IO.race
is helpful.
def getPricesFromJsonSource: IO[List[Price]] =
makeHttpRequest(url1)
.flatMap(response => IO(json.parse))
.flatMap { productList =>
IO(println(s"got ${productList.size} products from json api")).map(_ => productList)
}
def getPricesFromXmlApi: IO[List[Price]] =
makeHttpRequest(url2)
.flatMap(response => IO(xml.parse))
.flatMap { productList =>
IO(println(s"got ${productList.size} products from xml api")).map(_ => productList)
}
IO.race(getPricesFromJsonSource(), getPricesFromXmlApi())
Now let’s say that two times out of three the json code finishes before the xml api even finished replying. Being able to cancel an IO
implies that the xml parsing and printing won’t even be executed : we are saving precious CPU resources. With Future
, both computations would always go until the end of their execution to only keep the fastest one.
At this point you might wonder how cancelation works. In factIO
is a high level concept in cats-effect. It runs on top of Fiber
, a concurrency primitive to do cooperative multiprocessing. It’s the fundamental building block in cats-effect, they are like green threads, as opposed to native threads. Green thread are light-weight threads and you can theoretically spawn thousands of them without worrying about your application performance. Internally, they keep some state that allow them to decide whether a computation should be run / cancelled.
cats-effect performance
cats-effect has its own highly-optimised computing threadpool which is an implementation of a work stealing pool. It is inspired from the Rust tokyo library and the underlying idea is simple: when a thread is idle, it will try to steal tasks from other threads in a random fashion. Cats-effect also come with a dedicated threadpool for blocking operations (an operation that is waiting for an event in the system to continue, not doing anything in the meantime). Natively splitting the computing and the blocking pools have several deep implications:
- since the blocking operations are delegated to a dedicated pool, the computing pool has near the optimum number of threads running (ideally 1:1 with CPU number)
- the system has optimum performances when all the threads are busy (since they only try to steal tasks when they are idle)
- there is no single point of contention for fetching tasks as opposed to a single concurrent queue
Also, a chain of computation (IO
chain composed of map
and flatMap
, etc …) will continue to run on the same thread so there is no trampoline effect, as opposed to Future
. Because of that cats-effect it able to maximise thread affinity improving the performances of the system.
Fairness
cats-effect 3 is a cooperative multiprocessing system but it introduced a new auto-yielding mechanism, to ensure that some fairness is still guaranteed. Usually, we have to give back explicitly the control to the system so cats-effect runtime can take back the control. For instance:
def loop(): IO[Unit] =
IO(println("Hello world"))
.flatMap(_ => IO.sleep(1.seconds))
.flatMap(_ => loop())
will print hello world every second forever, blocking a single thread until the auto-yielding mechanism comes into play (every 1024 flatMap by default). If we want to reintroduce a bit of fairness, in other words give the chance to other processes to run at some point, before the auto-yielding mechanism, we can use the cede
operation explicitly, such as:
def loop(): IO[Unit] =
IO(println("Hello world"))
.flatMap(_ => IO.sleep(1.seconds))
.flatMap(_ => IO.cede)
.flatMap(_ => loop())
cede
is a hint for cats-effect runtime to give the change to run another process.
Conclusion
Through this article we have seen the major differences between the future and cats-effect multiprocessing systems. It is a highly efficient multiprocessing system that comes with a lot of interesting features such as cancelation, pure functional programming enablement, fine grained execution control, etc...
We have barely scratched the surface of this amazing library, and some of its most powerful features such as Resource
, Ref
, Defer
… Using cats-effect also means embracing a rich ecosystem with carefully designed libraries such as http4s, doobie, fs2 to name just few.
Finally, should you rewrite all your current codebase to IO
? Definitely not. Should you give it a try ? Definitely, yes !
Going further
- cats-effect website
- Why are fibers fast. An amazing article explaining the internals of cats-effect
- Memory management & cancelation in cats-effect