CodeX
Published in

CodeX

ARTICLE

Asking in Akka

From Akka in Action, Second Edition by Francisco Lopez-Sancho Abraham

This article jumps into the basics of how “asking” in Akka works.

Take 35% off Akka in Action, Second Edition by entering fccabraham2 into the discount code box at checkout at manning.com.

Asking. Or expecting a reply.

Asking, in Akka lingua, is sending a message and providing the means to handle the response. Two things can happen after sending a message. Either you get back a response indicating the message has been processed, or you don’t. If you don’t, then you need to decide what do you do about it — the same as sending a postal letter, like in the good old times. If you need a response and you don’t get it, how long are you willing to wait before you take action?

The answer is formulated in terms of time and two possible outcomes. In Akka, when we ASK for something, we’ll also have to provide how long we’re willing to wait and what we’re going to do, whether or not we receive an answer. Both things are straightforward to set up. For the timing, we use the class Timeout, and for the fact, we can get or not get an answer. This is pattern matching over both options, Success and Failure. These are the only two instances of the abstract class Try[T] where T is the type of object we expect to get if the answer gets back in time, and therefore, consider successful. We’ll see a sample right away when we look at our new implementation of the Manager, but before that, we can have a look at Figure 1 to get our heads around it.

Figure 1 Possible outcomes of the Manager asking the Worker

Here, we depict a manager asking a worker for a task inside some time boundaries before it stops waiting for a response. Depending on whether the answer comes before or after this timeout, the manager processes a Success or a Failure.

The last idea before we dive into the implementation is to bear in mind that we’re going to see two cases of this idea of asking: a simpler one, where the message sent as the question doesn’t contain anything inside, and a more complete one, where the object we sent contains some information the worker uses.

In both cases, we’ll have the same App and a guardian that passes the list of tasks the Manager gets. In Listing 1, we can see the initial input at the start of the program.

Listing 1 ManagerWorkerApp and Guardian

import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.{ ActorRef, ActorSystem, Behavior }
import scala.concurrent.duration.SECONDS
import akka.util.Timeout
import scala.util.{ Failure, Success }

object ManangerWorkerApp extends App {

val system: ActorSystem[Guardian.Command] =
ActorSystem(Guardian(), "example-ask-without-content")
system ! Guardian.Start(List("task-a", "task-b", "task-c", "task-d")) #A
}

object Guardian {

sealed trait Command
case class Start(tasks: List[String]) extends Command

def apply(): Behavior[Command] =
Behaviors.setup { context =>
val manager: ActorRef[Manager.Command] =
context.spawn(Manager(), "manager-1")
Behaviors.receiveMessage {
case Start(tasks) =>
manager ! Manager.Delegate(tasks)
Behaviors.same
}
}
}

#A input, list of tasks

#B passing down the input

The simpler question

If we run the App in the previous Listing 1, we can see two types of results. The first type is here, in Listing 2.

Listing 2 Output when response gets back in time

[Worker$] - My name is 'worker-task-c'. And I've done my task
[Manager$] - task-c has been finished by worker-task-c

Not much can be added here. In this case, a Worker had finished the task in time, and the Manager registers that fact. This is a Success.

On the other hand, we can see the other sort of output we can get in Listing 3.

Listing 3 Output when response does NOT gets back in time

[Manager$] - task 'task-a' has failed with [Ask timed out on [Actor[akka://example-ask-without-content/user/manager-1/worker-task-a#-450068248]] after [3000 ms]. Message of type [ask.simple.Worker$Do]. A typical reason for `AskTimeoutException` is that the recipient actor didn't send a reply.
[Worker$] - My name is 'worker-task-a'. And I've done my task
[DeadLetterActorRef] - Message [ask.simple.Worker$Done$] to Actor[akka://example-ask-without-content/deadLetters] was not delivered. [2] dead letters encountered. If this is not an expected behavior then Actor[akka://example-ask-without-content/deadLetters] may have terminated unexpectedly. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

In this last case, three related things had happened. The manager stopped waiting for the answer and felt back, reporting that task hadn’t been completed. On the other side, the worker took longer than it required, and despite the fact that the Manager gave up waiting, the Worker completed the task nevertheless. This is why we see the second message. The third thing that happened involved two new actors. When the Worker finished its task, it tries to send back an answer, but this message never reaches the Manager. How come? This is because, when asking, an ephemeral intermediating actor gets created for handling the waiting along the predefined Timeout. Eventually this produces a Success or a Failure message in the Manager.

This flow may be a little convoluted at first glance; let’s get some aid from Figure 2. to go over the big picture.

Figure 2. Ephemeral intermediate Actor life span and duties after timeout

From the beginning. When the Manager asks, an ephemeral actor gets in charge of sending the original message to the Worker and sending back Success or Failure to the Manager. Whether it’s one or the other, this actor immediately gets into Behavior.stopped. Once this actor is stopped, its reference stops being valid, and when our Worker sends back the message, it’s already too late—the ActorSystem takes over, making sure that this message gets delivered to deadLetters.

Now when things go south, we’ve at least some information, but this has to be taken with a pinch of salt; sending messages guarantees best effort.

If, for any reason, something goes wrong in the Worker, and it throws an Exception, or the rack where it runs gets on fire, the Manager gets a Timeout. The message from the non-existent Worker isn’t created nor sent to deadLetters, as any Exception in an Actor by default is dealt by its creator through supervision.

In Listing 4, we see the Manager we’ve been talking about. It defines the Timeout and also the countermeasures to Success and Failure.

Listing 4 Manager that asks

object Manager {

sealed trait Command
final case class Delegate(tasks: List[String]) extends Command
final case class Report(description: String) extends Command

def apply(): Behavior[Command] =
Behaviors.setup { context =>
implicit val timeout: Timeout = Timeout(3, SECONDS) #A

Behaviors.receiveMessage { message =>
message match {
case Delegate(tasks) => #B
tasks.map { task =>
val worker: ActorRef[Worker.Command] =
context.spawn(Worker(), s"worker-$task")
context.ask(worker, Worker.Do) { #C
case Success(Worker.Done) => #D
Report(s"$task has been finished by ${worker}")
case Failure(ex) => #D
Report(s"task '$task' has failed with [${ex.getMessage()}")
}
}
Behaviors.same
case Report(description) =>
context.log.info(description)
Behaviors.same
}
}
}
}

#A timeout implicitly passed to ask

#B input of tasks

#C ask the worker

#D handling possible outcome

How we use the ask here is quite straightforward. The manager is delegating the task Worker.Do to whom we’re asking, the worker, waiting no more than the timeout.

We can see in Listing 1, in the Worker itself, how Worker.Do(replyTo: ActorRef[Worker.Response]) has a reference to whom it needs to replay to and on what terms. Let’s see the worker definition in Listing 5.

Listing 5 Worker that answers

object Worker {

sealed trait Command
case class Do(replyTo: ActorRef[Worker.Response]) extends Command

sealed trait Response
case object Done extends Response

def apply(): Behavior[Command] =
Behaviors.receive { (context, message) =>
message match {
case Do(replyTo) =>
doing(scala.util.Random.between(2000, 4000)) #A
context.log.info(
s"My name is '${context.self.path.name}'. And I've done my task")
replyTo ! Worker.Done #B
Behaviors.stopped
}
}

def doing(duration: Int): Unit = {
val endTime = System.currentTimeMillis + duration
while (endTime > System.currentTimeMillis) {}
}
}

#A emulating the delegated task

#B replying back to manager

First, let’s draw our attention at Worker.Do(replyTo: ActorRef[Worker.Response]). You may be asking yourself how this replyTo gets introduced in the message we’re sending. After all, when we used context.ask(worker, Worker.Do), there’s no mention of replyTo in here. How does it end in there?

To answer that, we’ll have to look a bit deeper in Listing 6, in the signature of the ask itself.

Listing 6 Ask signature

// Scala API impl
override def ask[Req, Res](target: RecipientRef[Req], createRequest: ActorRef[Res] => Req)( mapResponse: Try[Res] => T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit

If it seems a bit daunting, it means that we need to go more slowly. It’s far from difficult if we go step-by-step.

When we wrote context.ask(worker, Worker.Do), looking at the ask’s signature worker corresponds to the target param. No mystery here, but Worker.Do being the createRequest might not be clear.

We may have thought we were passing an object there, but the signature createRequest: ActorRef[Res] => Req, tells us that this is a function, not an object. The question then is, how are we getting a function out of this Worker.Do object we are passing? If you’re unfamiliar with Scala, this is probably confusing. Objects in Scala can be treated as functions as long they have an apply method, and a case class as we have here is a special class that provides, among other things, this apply method under the hood. In our case, the case class Worker.Do(replyTo: ActorRef[Worker.Response]) produces, thanks to the compiler, a method like def apply(replyTo: ActorRef[Worker.Response]): Worker.Do = new Worker.Do(replyTo).

Because this method gets produced, the compiler now understands that this is what we mean when we put Worker.Do in the ask method. The compiler infers that we’re referring to the apply function of our case class, because this is the only thing that makes sense in the scope of the signature of createRequest. This is it—we’re passing Worker.Do.apply() to that method thanks to the syntactic sugar and the cleverness of the compiler.

Now for the mapResponse. Depending on if it receives the answer in time, the Success or Failure in our code is triggered. We already talked about this, but now let’s have a closer look. We see mapResponse: Try[Res] => T, which reads like the following: mapResponse is a function that has to define two functions—one with input Success[Res] and output T, and another with input Failure and output T. In our case, Res is Worker.Done, and T is Manager.Command.

Finally, the implicit responseTimeout: Timeout is the amount of time we wait for a response. The fact that it’s an implicit argument means that if there’s an implicit value in the scope, it uses it without you needing to pass it explicitly. Many scopes are defined in Scala, but won’t cover that right now. Suffice to say, that in the scope of our method, we have an implicit ‘timeout’.

Regarding classTag, we’ll not pay much attention. It’s there for historical reasons and binary compatibility.

This is all you need to know to use ask, but there’s something else which is useful to pay some attention to. What happens when the Manager sends that request? What’s happening at runtime? We mentioned before that between an actor that asks and the one that answers, there’s an intermediary actor. This middleman actor is the one our Workers replying to and from the signature of the Worker.Do(replyTo: ActorRef[Worker.Response]). We know its type is ActorRef[Worker.Response]. When the Manager asks the Worker, an actor of that type gets created and is used as the input to the method with no syntactic sugar, Worker.Do.apply(replyTo: ActorRef[Worker.Response]). Finally, this Worker.Do(intermediaryActor)is what the Worker receives.

Ask with payload

Sometimes we ask something to an actor, and we need to include some information in that question. In the previous example, the only thing the Worker knew was whom to respond to. Let’s have a look at a case almost equal to the previous one, except for the fact that when ask, we need to pass the worker a Task that contains an id and a description. See Listing 7 for an example of this.

Listing 7 Task Object as payload to pass when asking

final case class Task(taskId: String, taskDescription: String)
sealed trait Command
final case class Do(task: Task, replyTo: ActorRef[Worker.Response])
extends Command

In this case, when we provide the createRequest parameter to ask, we won’t be able to rely on Worker.Do.apply() as we did before. Remember that createRequest was expecting a function with one parameter, but now, our Worker.Do.apply has two parameters, task and replyTo. The framework provides the intermediary for us at runtime, the replyTo, and we need to provide the task. Now we need a method that can accept a Task as input, and we can pass it through to the Worker. This gives us back a function that can pass to the ask. In Listing 8, we have a function with such a signature.

Listing 8 Auxiliary method to create a function of the signature that ask expects

def auxCreateRequest (task: Worker.Task)(replyTo: ActorRef[Worker.Response]): Worker.Do = Worker.Do(task, replyTo)

Let’s have a look at the Manager in Listing 9 to see how this plays out.

Listing 9 Manager that uses the auxCreateRequest

object Manager {

sealed trait Command
final case class Delegate(tasks: List[String]) extends Command
final case class Report(outline: String) extends Command

def apply(): Behavior[Command] =
Behaviors.setup { context =>
implicit val timeout: Timeout = Timeout(3, SECONDS)
def auxCreateRequest(task: Worker.Task)( #A
replyTo: ActorRef[Worker.Response]): Worker.Do = #A
Worker.Do(task, replyTo) #A

Behaviors.receiveMessage { message =>
message match {
case Delegate(tasks) =>
tasks.map { task =>
val worker: ActorRef[Worker.Command] =
context.spawn(Worker(), s"worker-$task")
context.ask(
worker,
auxCreateRequest( #B
Worker.Task(System.currentTimeMillis().toString(), task))) { #B
case Success(Worker.Done(taskId)) =>
Report(s"$taskId has been finished by ${worker}")
case Failure(ex) =>
Report(s"task has failed with [${ex.getMessage()}")
}
}
Behaviors.same
case Report(outline) =>
context.log.info(outline)
Behaviors.same
}
}
}
}

#A aux function to pass a task to the worker

#B usage of task function

If you’re not used to functional programming, it might be difficult to understand straight away how this auxCreateRequest can produce the signature that ask.createRequest requires.

In our case, the Manager takes a Task from the guardian, let’s name it taskX, and passes it to our auxiliary method. This produces a new function we see in Listing 10.

Listing 10 Output after passing ‘taskX’ to auxCreateRequest

def irrelevantName(rsp:ActorRef[Worker.Response]):Worker.Do =  Worker.Do(taskX,rsp)

This is called currying, and this functionality is provided in most functional languages. We can think of it as a partial application of the function, and it works like the following. When having a function like multiplication(x: Int,y: Int) = x * y, you can curry that function by multipication(4), which means that you’re passing only x, not y. In doing this, you get back a new function multiplication Curried(z: Int) = 4 * z—and now you can use it as multiplicationCurried(3) = 12.

In Scala, we need to explicitly state when a function can be curried. This is done by separating those input variables we want to pass individually by their own parentheses. We would do that as def multipication(x: Int)(y: Int) in this case, and we use it as multiplication(4)_.

Coming back to transforming auxCreateRequest in Listing 11, we see how we can use the function createRequest, passing only the task taskX and the output function we’ll get.

Listing 11 Passing only one param to auxCreateRequest

auxCreateRequest(taskX)(replyTo: ActorRef[Worker.Response])                  #A

def irrelevantName(rsp:ActorRef[Worker.Response]): Worker.Do = auxCreateRequest(‘taskX`)(rsp:ActorRef[Worker.Response]) #B

#A currying with taskX
#B output

Now, irrelevantName function is what we can pass into createRequest and that taskX gets included in the message that the worker receives. It’s a bit of a twist if one isn’t used to currying, but you may well need this often. Getting to know the signature of ask is a piece of useful knowledge.

That’s all for this article. If you want to learn more about the book, check it out on Manning’s liveBook platform here.

--

--

--

Everything connected with Tech & Code. Follow to join our 900K+ monthly readers

Recommended from Medium

Learning to Code

It’s why the Centre Pompidou is one of my favorite buildings.

Make It Easy for Your Developers to Use Your Pattern Library

How to create Custom Java Collection by extending from an existing Collection class

Introducing “Upsoft.”​

Install ProM 6.9 tool on macOS

Retrospective Notes

Chaos Engineering with Docker Conducted by Mikolaj Pawlikowski

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Manning Publications

Manning Publications

Follow Manning Publications on Medium for free content and exclusive discounts.

More from Medium

Concurrent Sudoku Solver: Part 3 - Using Cats Effect Queue

Kafka Streams: Introduction

Kafka, KRaft and Storage Tiers

Micronaut vs Quarkus: part 2