What can ZIO do for me? A Long Polling example with sttp.

Pascal Mengelt
Nov 27 · 6 min read

Requirements: A basic understanding of ZIO and Cats.

This is my first blog on Medium, so have mercy;).
I am just a user / learner of ZIO.
If you are only interested in the ZIO-part you can jump to the chapter Long Polling with ZIO

Ok let us start. After checking out some Blogs (see https://github.com/pme123/zio-examples), I wanted to tackle a problem I had at work with ZIO.

Scenario

In our Play Application we use a Rest Service that provides Long Polling for fetching Tasks.

So our first solution was just to poll this Service:

Polling with Futures

Our client asks every second if there are some Tasks.

Annoying right?

https://gph.is/2ayuQLg

Here is the implementation:

actorSystem.scheduler.schedule(initialDelay = 5.seconds, interval = 1.seconds) {
fetchAndProcessTasks()
}
def fetchAndProcessTasks(): Future[Unit] = {
externalTaskApi.fetchAndLock(taskRequest).flatMap { externalTasks =>
if (externalTasks.nonEmpty)
processExternalTasks(externalTasks)
else
Future.successful({})
}.recover {
case NonFatal(ex) =>
error(s"Unable to fetch external tasks - $ex")
}
}

Long Polling with Futures

As externalTaskApi.fetchAndLock(taskRequest) supports already Long Polling, we just needed to add an additional attribute to the taskRequest.

Our Client asks for Tasks and waits a defined time for them.

After receiving the response, the Client asks for the next Tasks right away.

This was my first solution:

fetchAndProcessTasks() // start polling  def fetchAndProcessTasks(): Future[Unit] = {
externalTaskApi.fetchAndLock(taskRequest).map { externalTasks =>
if (externalTasks.nonEmpty)
processExternalTasks(externalTasks) // async Task handling
else
Future.successful({})
}.recover {
case NonFatal(ex) =>
error(s"Unable to fetch external tasks - $ex")
}.flatMap(_ => fetchAndProcessTasks())
}

You may spot the problem: Future[Future[_]]. If the App is restarted there could be two fetchAndProcessTasks() running now - Really Bad!

Adding a flag works (because it is a singleton) but it is considered bad Scala style:

@volatile var fetchNextTasks = true // flag to stop recurs. call
fetchAndProcessTasks() // start polling
// Shut-down hook to stop the recursive call
coordinatedShutdown.addTask(CoordinatedShutdown.PhaseServiceUnbind, "Shutdown Engine") { () =>
Future.successful {
fetchNextTasks = false
Done
}
}
def fetchAndProcessTasks(): Future[Unit] = {
externalTaskApi.fetchAndLock(taskRequest).map { externalTasks =>
if (externalTasks.nonEmpty)
processExternalTasks(externalTasks)
else
Future.successful({})
}.recover {
case NonFatal(ex) =>
error(s"Unable to fetch external tasks - $ex")
}.flatMap(_ =>
if (fetchNextTasks) // here we check if we should still fetch tasks
fetchAndProcessTasks()
else
Future.unit
)
}

Long Polling with a Typed Actor

So in the end we introduced a Typed Actor:

...
val system: ActorSystem[ExternalTasksFetchActor.Message] =
ActorSystem(ExternalTasksFetchActor(...).stopped(), "ExternalTaskFetchActor")
system ! ExternalTasksFetchActor.Start
// Shut-down hook to stop the recursive call
coordinatedShutdown.addTask(CoordinatedShutdown.PhaseServiceUnbind, "Shutdown Report Engine") { () =>
Future.successful {
system ! ExternalTasksFetchActor.Stop
Done
}
}
}
sealed case class ExternalTasksFetchActor(
...
)(implicit cred: Cred, ec: ..)
extends Logging {
def stopped(): Behavior[Message] =
Behaviors.setup { context =>
Behaviors.receiveMessage {
case Start =>
debug("Start Fetching ...")
context.self ! Fetch
running()
case other =>
warn(s"Unexpected Message: $other")
Behavior.same
}
}
def running(): Behavior[Message] =
Behaviors.setup { context =>
Behaviors.receiveMessage {
case Fetch =>
debug("Fetch Tasks")
fetchTasks(context.self)
Behavior.same
case Stop =>
debug("Stop fetching")
Behaviors.stopped
case Start =>
warn("Start received that was not expected")
Behavior.same
}
}
....

This is now safe but Wow so much code!

Long Polling with ZIO

https://zio.dev

Now let us try to do this with ZIO.

For the REST-call we use sttp.

for {
numbers <- sttp
.get(uri"""$url/5""") // long polling of 5 seconds
.response(asJson[List[Int]])
.send()
.map(_.body)
_ <- console.putStrLn(s"Result: $numbers")
} yield ()

First we have a simple GET request to the defined url. The result we log to the console (also an Effect).

This will call the service only once. So let’s do this forever:

(for {
...
} yield ()).forever

All we need to add is, of course forever

What about some Resilience? For example with this code, whenever you restart the server, the client will die.

sttp
.get(uri"""$url/5""")
...
.tapError(error =>
console.putStrLn(s"Failing .. ${error.getMessage}"))
.retry(ZSchedule.recurs(5) && ZSchedule.exponential(1.second))

Again we can add this with one line. I choose 5 attempts; each of them waits twice the time as the one before. This can be done by just combining recurs and exponential Schedules.

tapError provides a nice way to log the failing attempts.

Failing attempt (141 s): Connection refused: localhost/0:0:0:0:0:0:0:1:8088
Failing attempt (143 s): Connection refused: localhost/0:0:0:0:0:0:0:1:8088
Failing attempt (147 s): Connection refused: localhost/0:0:0:0:0:0:0:1:8088
Failing attempt (155 s): Connection refused: localhost/0:0:0:0:0:0:0:1:8088
Failing attempt (171 s): Connection refused: localhost/0:0:0:0:0:0:0:1:8088
Failing attempt (203 s): Connection refused: localhost/0:0:0:0:0:0:0:1:8088
There was an exception: Connection refused: localhost/0:0:0:0:0:0:0:1:8088

The Log shows nicely how each attempt needed twice the time as the one before. In the end you give up with the error message from the failing client.

Next the result type is Either[String, Either[DeserializationError[Error], List[Int]]] from sttp. I would like the result to be a List in any case, if there was an exception it should be printed to the console. A Failure would stop the program entirely, what we don’t want, for exampleZIO.fail(Error).

numbers <- sttp
...
.flatMap {
case Left(msg) =>
console.putStrLn(s"Problem with the service: $msg") *>
ZIO.succeed(Nil)
case Right(Left(errors)) =>
console.putStrLn(s"Problem deserialise: $errors") *>
ZIO.succeed(Nil)
case Right(Right(value)) =>
ZIO.succeed(value)
}

To be fair in all the examples before ZIO, we encapsulated the code that accessed the REST-Service and the one that handles the numbers. So in essence this would be the code left:

fetchNumbers("http://localhost:8088/5")
.flatMap(handleNumber)
.forever

Is this everything? Right I forgot that handle the Tasks are asynchronous. Adding some debug to the console:

private def handleNumbers(numbers: Seq[Int]) =
ZIO.foreach(numbers) { number =>
(for {
_ <- ZIO.sleep(1.second)
t <- clock.currentTime(TimeUnit.SECONDS)
_ <- console.putStrLn(s"Result (${t % 1000} s): $number")
} yield ())
}

Shows that there is at most one number handled per second:

Result (470 s): [585]
Result (472 s): [604]
Result (473 s): [333]
Result (474 s): [46]
Result (475 s): [183]

So how can we handle the numbers in parallel?

ZIO.foreachPar(numbers) { number =>
...

The answer is to add the three characters Par for parallel. The result looks better now:

Result (482 s): 434
Result (482 s): 458
Result (482 s): 92
Result (483 s): 265
Result (483 s): 779

Running the App

Running the Client is extending zio.App and implementing its run function:

object HttpClient extends zio.App {

def run(args: List[String]): ZIO[zio.ZEnv, Nothing, Int] =
fetchNumbers
.tapError { ex => console.putStrLn(s"There was an exception: ${ex.getMessage}")}
.fold(_ => -1, _ => 0)
...

We log the error and finish the program either successful (0) of failing (-1).

Or you integrate it in your code:

new DefaultRuntime {}
.unsafeRun(
HttpClient.fetchNumbers
.tapError { ex => console.putStrLn(s"There was an exception: ${ex.getMessage}") }
.fold(_ => -1, _ => 0)
)

All the client code you find here: HttpClient.

The Server

The server part simulates the behaviour of providing Numbers at different times.

I tried some aspects of ZIO, like its module pattern. But this is maybe something for another Blog.

Conclusion

I still struggle with ZIO, for example providing the runtime/ environment or interacting with Cats Libraries still gives me some headache.

But this example shows how powerful ZIO is to write elegant and robust programs. So give it a try if you are not already a Fan! It is really cool stuff.

References

ZIO: https://zio.dev

sttp: https://github.com/softwaremill/sttp

Cats: https://typelevel.org/cats/

Thanks to the reviewer Peti Koch.

Let me know if you have questions or stuff that can be done better!

 by the author.

Pascal Mengelt

Written by

Working for finnova.com in the Banking business. Prefer to work with Scala / ScalaJS.

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade