Akka Http Client meets ZIO

Adam Rempter
inganalytics.com/inganalytics
9 min readApr 15, 2020

Recently I have been playing with GitHub API to parse repositories data for a particular organization. Long story short, for some organizations like, for instance Netflix there are few pages each containing 30 repositories. Each repository may have one or more URLs depending on what I want to get. That leads to producing many concurrent Http requests.

I came up with some logic at the end which was not simply a few lines. That led to the question…

Can I somehow gain more control over Akka Http Client and Scala parallel Futures? Is there any library that could help to simplify my code?

So, I came with some points to test:

  • I should be able to control parallelism. Run X number of requests at a given time…
  • Each request should have number or retries, ideally with some exponential backoff
  • If request processing takes too long, I should time it out and stop processing request

Challenge sounds simple, right?

In this article, I will first quickly show, how this can be more or less achieved with Akka Http client combined with Akka Streams. Both almost always come “bundled” together and they integrate quite well. I will skip plain Scala and Http client example.

Then I will use ZIO to do the same thing…

Many ZIO concepts are already well explained in ZIO Summary. Because of that, I think I will skip explaining ZIO basics here. I recommend reading it as it will help understand this article.

Test environment setup

The diagram below shows the environment used to test both ZIO and Akka Http Client.

Test environment

The source server has two endpoints

  • /status — endpoint fails randomly with StatusCodes.InternalServerError
  • /slow endpoint will Thread.sleep for over 5 seconds before producing a response

The user uses proxy Http Server and curl command to query three different endpoints… /zioMany runs 100 requests to get status.

Akka Http Client

Starting with Akka Http client, one option to implement the above points is to use Akka Streams combined with Http Client. It is done with two streams:

  • One stream, to execute single Source.fromFuture combined with RestartSource
  • Second stream, Source(List[HttpRequests]) with mapAsync for controlling number of parallel requests
Akka Http client combined with Akka Streams

With this approach I can indeed try to process HttpRequest and complete with value if the response was 200 OK. Otherwise I will raise an exception so the Source will be restarted according to specified parameters.

With the second stream I can control parallelism of execution.
Cool! Now one thing is still missing here — timeout. So with Akka configuration, we can set different parameters like:

akka.http.client.connecting-timeout
akka.http.client.idle-timeout
akka.http.host-connection-pool.client.idle-timeout

Ok, so which one? :) There is actually Akka documentation describing timeouts. If you are interested, check here

There is also Circuit Breaker mechanism which also will help in some situations where we want to fail requests quickly…

I skipped adding a breaker … but you get the idea.

To summarize, this code is readable and concise. Right? There are more things that can be used from Akka Streams, for instance throttling. Streams also add things like backpressure support.

But some people may prefer not to use streams API, or they are not comfortable with them at first.

Also, I think achieving the same thing with plain Scala methods may not be as concise as the above snippet. There are also some “traps” with using Futures, like for instance:

val resp: List[Future[HttpResponse]] = requests.map(executeRequest)

The above will eagerly start the execution of all requests… So already too late for controlling things like parallelism.

ZIO to rescue?

Note: I have been using ZIO for around two weeks now and I am still learning its concepts.

Some assumptions, before introducing ZIO:

  • Since quite often you cannot just rewrite everything from scratch, I decided to try to integrate ZIO with existing Akka Http rest server and Http Client. Ideally not changing the program structure massively
  • I also want to use one ActorSystem and reduce passing it as a class parameter or implicit parameter. So I skipped creating ZLayer from managed like below:
ZLayer.fromManaged(Managed.make(Task(ActorSystem("ZIOTest")))(sys => Task.fromFuture(_ => sys.terminate()).either))

The “problem” with the above code is that Task(ActorSystem(“ZIOTest”)) will start a new ActorSystem each time it is used and will terminate it after use. Which is OK, but forces some usage pattern.

Instead I attempted creating ActorEnv, so I could access it either via Runtime(ActorEnvLive) or from ZIO[ActorEnv, …].

Implementing above worked with one exception. This could be related to the approach I took or lack of experience in using ZIO. Nevertheless, I still think it makes sense to share it…

ZIO based Akka Http

Akka Http Server has three endpoints that users can query. They are described at the beginning of this article. Just to remind, those are /zio, /slow and /zioMany.

Also, there are two abstract methods in Routes, which when implemented will run request(s) to source server:

protected def runRequest(request: HttpRequest): HttpResponse
protected def runManyRequests(requests: List[HttpRequest]): List[HttpResponse]

I skipped the processing of results received from the source server and I just complete the route with string response. I guess it is not that important for this example.

Those methods are then described by abstract Service in ZIO HttpClient trait:

object HttpClient {
type HttpClient = Has[Service]

trait Service {
def executeRequest(request: HttpRequest): ZIO[ActorEnv, Throwable, HttpResponse]
def executeManyRequests(requests: List[HttpRequest]): ZIO[ActorEnv, Throwable, List[HttpResponse]]
}
}

So the implementation should return either HttpResponse or List[HttpResponse] and will require ActorEnv. Also, note that the return type here is not HttpResponse but ZIO.

Now the interesting part is actual implementation provided by HttpClientImpl

In order to add ZIO superpowers to Http client we need to lift it to ZIO Task.

ZIO provides several options to integrate with the existing code. Here I am using Task.fromFuture so Http client can be promoted immediately to Task.

To provide ActorSystem to Http Task I use accessM in the following lines. Then, in for comprehension Task

def getActorSystem: Task[ActorSystem]

becomes ActorSystem:

ZIO.accessM { env =>
for {
system <- env.dependencies.getActorSystem

I guess the more proper way to access ActorSystem would be to use something like:

val system: ZIO[Has[ActorSystem], Nothing, ActorSystem] =
for {
actorSystem <- ZIO.access[Has[ActorSystem]](_.get)
} yield actorSystem

Notice that all above is just a description and an ActorSystem will be provided later on. For now, all I say is that this part of the program will require ActorEnv and will either complete with HttpRespose or fail with Throwable:

ZIO[ActorEnv, Throwable, HttpResponse]

Code snippet, where I check response is quite similar to one done earlier with Akka Streams. One difference is that I map response, either to IO.succeed or IO.fail.

The cool part is actually here because now I have ZIO Task and I can add:

.retry(Schedule.recurs(3) && Schedule.exponential(10.milliseconds))
.timeoutFail("Timeout occurred, interrupted")(3.seconds)

With those two lines I am actually describing what to do in case of response other than OK 200 or if the request will take more than three seconds to complete.
In the end, because it can still fail (if more than three retries are needed) I map any error to IO.succeed

.catchAll(e => IO.succeed(HttpResponse(entity = HttpEntity(ContentTypes.`text/plain(UTF-8)`, s"To many tries, $e \n")))

But I think it is quite clear to read it, right?

Now if I want to add parallelism, I can similarly like with Streams, define another method:

def executeManyRequests(requests: List[HttpRequest]): ZIO[ActorEnv, Throwable, List[HttpResponse]] =
ZIO.foreachParN(10)(requests) { r =>
ZIO.sleep(500.milliseconds).provideLayer(clock.Clock.live) *>
executeRequest(r)
}

ZIO.sleep is not needed but I added it to draw where batch starts or ends. Btw, I do not need to use Thread.sleep anymore. What a cool small feature!

This magic operator *> is according to docs:

A variant of flatMap that ignores the value produced by this effect.

Task.fromFuture provides ExecutionContext for Http client, so the request will run on zio-default-async thread pool.

One more thing at the end of this part is the way I start Http Server itself

val program: ZIO[ZEnv, Throwable, Future[Http.ServerBinding]] =
(for {
systemT <- ZIO.access[ActorEnv](_.dependencies.getActorSystem)
b <- systemT.flatMap{ s=> bindTask(s) }
} yield b
).provide(ActorEnvLive)

override def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =
program.foldM(
e => { console.putStrLn(s"Server failed to start ${e.getMessage}") *> IO.fail(throw e) },
_ => { console.putStrLn("Server started...") *> IO.succeed(0) }
)

The first part is actually a program description, which is then executed in the main method — run defined by ZIO. Instead of using runtime with ActorEnv dependency, here I use `provide` to add ActorEnv as a requirement to program. That translates layer:

ZIO[ZEnv with ActorEnv, Throwable, Future[Http.ServerBinding]]

to the following one:

ZIO[ZEnv, Throwable, Future[Http.ServerBinding]]

Using program.foldM allows one to provide an effect to execute for the case of an error and one effect to execute for the case of success. Which finally translates to what ZIO requires, that is

ZIO[ZEnv, Nothing, Int]

I do not see a lot of benefits in running Akka Http itself as ZIO (yet?) but at least it is concise with the rest of the code.

What did not work?

As mentioned earlier I provide an ActorSystem via ActorEnv. This is then used to create custom program runtime — actual moment, where a description is becoming running code.

private val runtime = Runtime(ActorEnvLive, Platform.default)

So when I implement one of the abstract methods from Routes I do not need to explicitly provide ActorEnv:

override protected def runManyRequests(requests: List[HttpRequest]): List[HttpResponse] =runtime.unsafeRun(HttpClient.executeManyRequests(requests).provideLayer(HttpClient.live))

And ActorEnv is implemented as follows:

trait ActorEnvLive extends ActorEnv {
private val system = ActorSystem("ZIO"+System.currentTimeMillis())

val dependencies = new Service {
override def getActorSystem: Task[ActorSystem] = Task(system)
}
}

Here is where I cheated a bit (or more?). I cannot actually use here:

ZLayer.fromManaged(Managed.make(Task(ActorSystem("ZIOTest")))

Or even:

Task(ActorSystem("ZIO"+System.currentTimeMillis())

Because Task method definition uses by name parameter:

def apply[A](a: => A): Task[A]

And will create ActorSystem every time I request it. So evaluating ActorSytem here is not according to the definition of the pure program description I guess. But this approach still works :) I get one ActorSystem…

Some other points

Garbage Collection when using ZIO

From ZIO documentation

ZIO’s concurrency is built on fibers, which are lightweight “green threads” implemented by the ZIO runtime system.

Unlike operating system threads, fibers consume almost no memory, have growable and shrinkable stacks, don’t waste resources blocking, and will be garbage collected automatically if they are suspended and unreachable.

So this is how GC looks in a short time period:

At the same time standard, Akka Http Server ran four GC cycles.

Conclusion

Before I tried ZIO, I tried to use other libraries like Cats. But I admit I really spent just a few hours with Cats. What I did not quite liked in Cats was really the terminology used. There you find a lot of Monads, Functors, FreeMonads, Kleisli, and others. I mean I do not mind naming, but rather than focus on what the library provides and where it can be useful, after a while, you find yourself studying in Google, what Monad actually is. So this is drawing attention from real use… But that’s, of course, my opinion.

ZIO has some similarities with other libraries, but I think it focuses on making it understandable for IT people just looking for ways to improve their code.

What is not obvious at first in ZIO is, surprise ZIO :) and its aliases like:

UIO, RIO, URIO, etc.

But after some time (not long) they become somehow intuitive…

Also some considerations should be taken on how and on which level to provide program dependencies because there are quite a few options and this can be confusing at first. On the other hand, the same thing gives quite some freedom in providing them and in translating from one ZIO to another.

This is of course just introduction, but ZIO has much more to offer:

  • we can create a lazy code description rather than an eager one. That leads to other options like predictable refactoring…
  • we can use fibers as an alternative to Scala Futures and Fibers are interruptible
  • also, gain better control over code execution on thread pools
  • actually, I haven’t tested this so far, but ZIO promises better testing of the code
  • it adds some more options for code composition with modules and layers

I will definitely keep continuing to explore ZIO features …

That’s all, thanks for reading…

Full code for this article can be found in the following repository in GitHub.

--

--