If you’re doing functional programming in Scala, there is a high chance you’ve been struggling (or you’re going to) because you have to deal with code that doesn’t respect the fundamental principles of FP that are totality, determinism and purity. In case this is blurry for you, let’s start with a reminder:
The JVM ecosystem is huge: you can find a library for almost anything, but most of them do not adhere to these principles, even among the ones in Scala. Maybe you also have some legacy code that’s not functional either. Sooner or later, the need to wrap non-functional code arises. I will show in this post how to deal with various kinds of impure code using the ZIO library.
The examples will be illustrated with real-life code from libraries I’ve been working on (
zio-akka-cluster) as well as various Scala or Java dependencies I’ve been using at work. This post assumes some basic knowledge of ZIO, so you might want to look at the documentation and resources if you’re completely new.
The One that throws
Let’s start with an easy one: you want to call some legacy code, whether it’s Scala or Java, and this code might throw exceptions. Sometimes those are documented, sometimes you don’t even know if the code will throw or not. Throwing exceptions breaks totality, because your function is not returning a value for every possible input.
If you’re using ZIO, you should not throw exceptions within
map/flatMap, because it will result in the fiber being killed. Instead, you should use
IO.effect: this effect constructor will catch exceptions for you and return an
IO[Throwable, A] (aka
Task). You can then use
catchAll or other combinators to deal with this exception.
Note: you can also use
Task.apply which does the same thing.
It is a good rule of thumb to use this whenever you’re not sure if the code you’re calling might throw exceptions (or if you’re sure it will, of course). That way, you transform partial code into an effect that explicitly states that it might fail with an exception.
The One that blocks
Now, how about some legacy code which not only throws exceptions, but also blocks the current thread until completion? If you run it within a regular
IO, you will block a thread from your application’s main thread pool and potentially cause thread starvation. Instead, it is better to run such task inside another thread pool dedicated to blocking tasks.
ZIO has a solution for that, which is to wrap your code within
effectBlocking. This will ensure that the code is ran into a dedicated thread pool. The return type is
ZIO[Blocking, Throwable, A], which means that it requires a “blocking environment” (= the thread pool to use) and that it catches exceptions. Don’t know how to create that blocking environment? No worries, simply use
zio.App as your main function and one with sensible defaults is provided for you.
By the way, never wrap
Thread.sleep, use non-blocking
The One that calls back
It’s becoming more and more common these days for Java libraries to require some kind of callback instead of blocking. For example, the AWS SDK for Java recently got a 2.0 makeover that replaced the blocking functions by new ones returning a
CompletableFuture, which itself has a
handle method taking a callback that will be executed once the API calls returns.
How to deal with such a function with ZIO? By wrapping it with
effectAsync. It gives you a function that you can call when the callback is triggered, and that will complete the effect with either a failure or a value.
In the following snippet, you can see that
effectAsync provides a function
cb, which we call when our callback from
CompletableFuture is triggered.
cb takes an effect, so we can call
IO.fail if there was an error, or just
IO.succeed in case of success (here
IO.unit, which is equivalent to
Note that you should only use
effectAsync when the callback is called once. I’ll describe later in this post how to deal with callbacks called multiple times.
The One living in the Future
You can also use
effectAsync in combination with a Scala
Future, but there’s even an easier way for that:
IO.fromFuture. The wrapped code will be provided an implicit
ExecutionContext which can be used to create the
Future, or a custom one can be used.
The following example shows how to wrap Akka’s
ask method (
?), which returns a
Future. The resulted effect will return once the
Future completes, but it won’t block the current thread the way
Await.result does. It returns a
Task, because a
Future can either succeed or fail with an exception.
The One with the clean stop
Some APIs give you an object that needs to be closed explicitly after usage in order to free some resources. For example, let’s take Akka’s
ActorSystem. It has a method
terminate that performs a “clean stop”, which is very important if you have a cluster of nodes running Akka: it will tell the other nodes that this actor system is shutting down, so that routers, shards and other distributed components stop sending messages to it. If you stop your node without a proper
terminate, it will take some time for the cluster to detect the node is unreachable, possibly resulting in messages being lost.
ZIO has a data type that encapsulate initialization and closing logic:
Managed. Its constructor
make takes 2 functions: one to create the object, and one to release it. ZIO will make sure the release function is called whatever happens while you’re using it, a bit like
try/finally in Java.
The following example combines
ActorSystem creation might throw) and
terminate function returns a
The One with the loop
Let’s tackle a more complex case now. Sometimes you have an API for polling data that will return 0 or more elements, and that you have to call in a loop to keep getting more elements. That sounds like a good case for using ZIO
To illustrate this example, let’s consider the AWS SDK for SQS again. There is an API you can call to receive elements from a given queue. The first thing we need to do is to wrap it with
IO.effectAsync as seen before, so we get a
Task[List[Message]]. Then, we can convert it into a stream by calling
Stream.fromEffect, and we now have a
Stream[Throwable, List[Message]]. We want to process items one by one, so we need to flatten this list into our stream: this can be done with
Stream.fromIterable and a simple
flatMap to transform the
Stream[Stream[Message]] into a simple
One more thing: once we consumed all the messages, we want to repeat this to get more messages. This can be done by simply adding
.forever to repeat the cycle infinitely.
This way, you won’t pull elements from the queue faster than you can consume them. There are plenty of combinators for Streams (and the list is growing) which makes them super easy to use, I encourage you to take a look!
The One that’s pushy
We just handled the case of pulling data, but what about when the data is pushed? We can’t use
effectAsync because its callback must be called one time only. No worries: we’ll call ZIO
Queue to the rescue.
Let’s consider Akka Distributed PubSub for this example. You can subscribe to a particular topic and you will start receiving all messages published to this topic from nodes in your cluster.
Our approach will be the following: in order to subscribe to a topic, we will first create a
Queue, which is done easily with
Queue.unbounded or its back -pressured variants (
sliding). Then, we’ll create an Akka Actor that will subscribe to the topic, and from within the actor we will call
queue.offer to enqueue every message we receive. The actor will have to call
unsafeRunSync in order to run this
offer effect. This is one of the rare case where we need to call this outside of our main function.
Because you should only have a single
Runtime in your application, you can use
IO.runtime to get the current runtime and pass it to the actor, so that the actor can use it to call
You can find a fully featured implementation of this pattern in the
We’ve seen that ZIO provides a number of constructors to easily wrap patterns common in the non-functional world. For more complex cases, you can combine the power of ZIO data structures such as
Queue to achieve purity.
I hope this has been helpful, but let me know if you encounter cases that I did not cover, or (even better) head to the ZIO Gitter channel to get some help.