Error Handling in Akka Actor (with Future)

image credit to sitepoint

We have been learning & building Scala(Akka) service since last Oct. As promised by many tutorials, Scala looks like Java (plus functional language like Ocaml), and Akka Actor to me, is a mechanism to deal with the concurrency in Scala.

Java is not a foreign language to me at all, neither is concurrency programming. Still, I have to say the learning curve of Akka Actor is not smooth. Among features (or hiccups) that surpised me, error handling is top on the list.

All code in this post are also in my git repo.

Look at a simple actor below:

# ThrowExceptionActor.scala
class ThrowExceptionActor extends Actor {
override def receive: Receive = {
case te: ThrowException =>
sender ! 1
...
  }
}

Then we ask this actor in our main class like this:

# Main_exception.scala
val res1 = throwExceptionActor ? ThrowException()
res1.onComplete {
case Success(x) => println(“throwExceptionActor.ThrowException return Success: ”+x)
case Failure(ex) => println(“throwExceptionActor.ThrowException return Failure: “+ex)
}

In Success case, it should give us 1.

Question:

What if the operation in above actor actually throws some exception ?

Say like this ?

# ThrowExceptionActor.scala
class ThrowExceptionActor extends Actor {
override def receive: Receive = {
case te: ThrowException =>
throw new Exception(“Exception from ThrowExceptionActor.receive.ThrowException”)
sender ! 1
...
}
}

I would expect this exception take us to case Failurein main, and the ex get print out nicely.

But… nope.

So the exception stack trace does get print out on console. The exception we receive inside case Failure in main, however, is not what we throw, but some time-out one:

akka.pattern.AskTimeoutException: Ask timed out on [Actor[akka://Sys/user/throwExceptionActor#259865281]] after [5000 ms]. Message of type [akka.actor.playground.exception.ThrowException]. A typical reason for `AskTimeoutException` is that the recipient actor didn’t send a reply.

What ?

Future

How about Future ? Let’s throw some exception inside a Future:

#ThrowExceptionActor.scala
case tef: ThrowExceptionFromFuture =>
val originalSender = sender()
val fut = Future {
val lst = List(1,2,3)
originalSender ! lst(5)
}

Run it.

#Main_exception.scala
val res2 = throwExceptionActor ? ThrowExceptionFromFuture()
res2.onComplete {
case Success(x) => println(“throwExceptionActor.ThrowExceptionFromFuture return Success: “+x)
case Failure(ex) => println(“throwExceptionActor.ThrowExceptionFromFuture return Failure: “+ex)
}

And… it doesn’t even print out original stack trace at all, just the same stupid time-out error.


The behavior above actually documented in Akka Actor doc.

Warning
To complete the with an exception you need to send an akka.actor.Status.Failure message to the sender. This is not done automatically when an actor throws an exception while processing a message.

So what to do ?

If an actor throw exception while processing a message, it actually restart itself. With that in mind, we could create a trait where every time the actor restart, we send throwable to sender like below:

# FailurePropatingActor.scala
trait FailurePropatingActor extends Actor {
override def preRestart(reason: Throwable, message: Option[Any])
   {
super.preRestart(reason, message)
sender() ! Status.Failure(reason)
}
}

Use it to create out actor.

# ThrowExceptionActorWithErrorHandling.scala
class ThrowExceptionActorWithErrorHandling extends FailurePropatingActor { ... }

And it works great for exception throw directly from actor:

This time, we actually get the exception we throw. 👍


Future

However, this won’t work if the exception is throw from inside a Future. In fact, once we are inside the future, old context is gone (sender is no longer valid), actor will move on to the next message, it doesn’t even know what happened to the future.

One way to fix it, is to pipe back the whole future to sender:

#ThrowExceptionActor.scala
case tef: PipeBackExceptionFromFuture =>
val fut = Future {
val lst = List(1,2,3)
lst(5)
}
fut pipeTo sender()

This works without using new trait FailurePropatingActor above:

Since we already create a trait FailurePropatingActorto deal with this, here is another way.

Send the failure back to actor itself, and deal with it like a message:

#ThrowExceptionActorWithErrorHandling.scala
override def receive: Receive = {
case tef: ThrowExceptionFromFuture =>
val fut = Future {
val lst = List(1,2,3)
lst(5)
}
fut.to(self, sender())
case Status.Failure(ex) =>
throw ex
}

This should trigger the restart of actor.

Works like a charm:

We can even create a trait to do this.

# FailurePropatingActor.scala
trait FailurePropatingActor extends Actor {
override def preRestart(reason: Throwable, message: Option[Any])
   {
super.preRestart(reason, message)
sender() ! Status.Failure(reason)
}
}

Couple open questions though:

  1. We restart the actor everytime there is an exception, is it going to slow it down?

I guess if exception doesn’t happen often, it should be ok.

2. In reality, we might want to process the result of a future, then send the outcome to the next actor/http response/etc. If that’s the case, we still need to go through whole onComplete process:

someFuture.onComplete {
case Success(res) => { ... }
case Failure(ex) => { ... }
}

then the whole fut.to(self, sender()) pattern doesn’t make sense.