Wrapping impure code with ZIO

Pierre Ricadat
6 min readJul 28, 2019

--

If you’re doing functional programming in Scala, there is a high chance you’ve been struggling (or you’re going to) because you have to deal with code that doesn’t respect the fundamental principles of FP that are totality, determinism and purity. In case this is blurry for you, let’s start with a reminder:

The JVM ecosystem is huge: you can find a library for almost anything, but most of them do not adhere to these principles, even among the ones in Scala. Maybe you also have some legacy code that’s not functional either. Sooner or later, the need to wrap non-functional code arises. I will show in this post how to deal with various kinds of impure code using the ZIO library.

The examples will be illustrated with real-life code from libraries I’ve been working on (zio-sqs, zio-akka-cluster) as well as various Scala or Java dependencies I’ve been using at work. This post assumes some basic knowledge of ZIO, so you might want to look at the documentation and resources if you’re completely new.

Scala Developer discovering ZIO

The One that throws

Let’s start with an easy one: you want to call some legacy code, whether it’s Scala or Java, and this code might throw exceptions. Sometimes those are documented, sometimes you don’t even know if the code will throw or not. Throwing exceptions breaks totality, because your function is not returning a value for every possible input.

If you’re using ZIO, you should not throw exceptions within IO.succeed or map/flatMap, because it will result in the fiber being killed. Instead, you should use IO.effect: this effect constructor will catch exceptions for you and return an IO[Throwable, A] (aka Task). You can then use mapError, catchAll or other combinators to deal with this exception.

Note: you can also use Task.apply which does the same thing.

Wrapping Java NIO file copy

It is a good rule of thumb to use this whenever you’re not sure if the code you’re calling might throw exceptions (or if you’re sure it will, of course). That way, you transform partial code into an effect that explicitly states that it might fail with an exception.

The One that blocks

Now, how about some legacy code which not only throws exceptions, but also blocks the current thread until completion? If you run it within a regular IO, you will block a thread from your application’s main thread pool and potentially cause thread starvation. Instead, it is better to run such task inside another thread pool dedicated to blocking tasks.

ZIO has a solution for that, which is to wrap your code within effectBlocking. This will ensure that the code is ran into a dedicated thread pool. The return type is ZIO[Blocking, Throwable, A], which means that it requires a “blocking environment” (= the thread pool to use) and that it catches exceptions. Don’t know how to create that blocking environment? No worries, simply use zio.App as your main function and one with sensible defaults is provided for you.

Wrapping resource loading

By the way, never wrap Thread.sleep, use non-blocking IO.sleep instead.

The One that calls back

It’s becoming more and more common these days for Java libraries to require some kind of callback instead of blocking. For example, the AWS SDK for Java recently got a 2.0 makeover that replaced the blocking functions by new ones returning a CompletableFuture, which itself has a handle method taking a callback that will be executed once the API calls returns.

How to deal with such a function with ZIO? By wrapping it with effectAsync. It gives you a function that you can call when the callback is triggered, and that will complete the effect with either a failure or a value.

In the following snippet, you can see that effectAsync provides a function cb, which we call when our callback from CompletableFuture is triggered. cb takes an effect, so we can call IO.fail if there was an error, or just IO.succeed in case of success (here IO.unit, which is equivalent to IO.succeed(())).

Wrapping AWS SDK 2.0 for publishing to SQS

Note that you should only use effectAsync when the callback is called once. I’ll describe later in this post how to deal with callbacks called multiple times.

The One living in the Future

You can also use effectAsync in combination with a Scala Future, but there’s even an easier way for that: IO.fromFuture. The wrapped code will be provided an implicit ExecutionContext which can be used to create the Future, or a custom one can be used.

The following example shows how to wrap Akka’s ask method (?), which returns a Future. The resulted effect will return once the Future completes, but it won’t block the current thread the way Await.result does. It returns a Task, because a Future can either succeed or fail with an exception.

Wrapping Scala Future from Akka ask

The One with the clean stop

Some APIs give you an object that needs to be closed explicitly after usage in order to free some resources. For example, let’s take Akka’s ActorSystem. It has a method terminate that performs a “clean stop”, which is very important if you have a cluster of nodes running Akka: it will tell the other nodes that this actor system is shutting down, so that routers, shards and other distributed components stop sending messages to it. If you stop your node without a proper terminate, it will take some time for the cluster to detect the node is unreachable, possibly resulting in messages being lost.

ZIO has a data type that encapsulate initialization and closing logic: Managed. Its constructor make takes 2 functions: one to create the object, and one to release it. ZIO will make sure the release function is called whatever happens while you’re using it, a bit like try/finally in Java.

The following example combines Managed with Task (the ActorSystem creation might throw) and Task.fromFuture (the terminate function returns a Future).

Wrapping Akka ActorSystem

The One with the loop

Let’s tackle a more complex case now. Sometimes you have an API for polling data that will return 0 or more elements, and that you have to call in a loop to keep getting more elements. That sounds like a good case for using ZIO Streams.

To illustrate this example, let’s consider the AWS SDK for SQS again. There is an API you can call to receive elements from a given queue. The first thing we need to do is to wrap it with IO.effectAsync as seen before, so we get a Task[List[Message]]. Then, we can convert it into a stream by calling Stream.fromEffect, and we now have a Stream[Throwable, List[Message]]. We want to process items one by one, so we need to flatten this list into our stream: this can be done with Stream.fromIterable and a simple flatMap to transform the Stream[Stream[Message]] into a simple Stream[Message].

One more thing: once we consumed all the messages, we want to repeat this to get more messages. This can be done by simply adding .forever to repeat the cycle infinitely.

Wrapping AWS SDK 2.0 for streaming SQS messages

This way, you won’t pull elements from the queue faster than you can consume them. There are plenty of combinators for Streams (and the list is growing) which makes them super easy to use, I encourage you to take a look!

The One that’s pushy

We just handled the case of pulling data, but what about when the data is pushed? We can’t use effectAsync because its callback must be called one time only. No worries: we’ll call ZIO Queue to the rescue.

Let’s consider Akka Distributed PubSub for this example. You can subscribe to a particular topic and you will start receiving all messages published to this topic from nodes in your cluster.

Our approach will be the following: in order to subscribe to a topic, we will first create a Queue, which is done easily with Queue.unbounded or its back -pressured variants (bounded, dropping or sliding). Then, we’ll create an Akka Actor that will subscribe to the topic, and from within the actor we will call queue.offer to enqueue every message we receive. The actor will have to call unsafeRunSync in order to run this offer effect. This is one of the rare case where we need to call this outside of our main function.

Because you should only have a single Runtime in your application, you can use IO.runtime to get the current runtime and pass it to the actor, so that the actor can use it to call unsafeRunSync.

Wrapping Akka Distributed PubSub

You can find a fully featured implementation of this pattern in the zio-akka-cluster repository.

We’ve seen that ZIO provides a number of constructors to easily wrap patterns common in the non-functional world. For more complex cases, you can combine the power of ZIO data structures such as Managed, Stream and Queue to achieve purity.

I hope this has been helpful, but let me know if you encounter cases that I did not cover, or (even better) head to the ZIO Gitter channel to get some help.

--

--