Assemble messages from Akka Actors

picture credit to towardsdatascience.com

As our service grow, it comes to a point we need to interact with more than one data sources (could be db, or another service), gather responses from them, massage them into one thing and return to the user.

A simple use case looks like this

  • We have an actor say DataGeneratorActor.
  • We send out 2 actors to collect data for us.

Now here are 2 possibilities:

  • The 2nd actor depends on 1st actor’s response to act: Sequence
  • The 2nd actor doesn’t care what 1st actor carry back: Parallel

All code in this post could be found in my git repo.


Sequence

For sequence case, it’s straight forward to do this:

val data1Actor = ... //create actor#1
val data1ObjFuture: Future[Data1Obj] = (data1Actor ? GetData1()).mapTo[Data1Obj]

val data2Actor = ... //create actor#2

val originalSender = sender()
val anyFuture = data1ObjFuture.onComplete{
case Success(x) =>
val data2ObjFuture = data2Actor ? GetData2(x)
data2ObjFuture.pipeTo(originalSender)
...
}
Run it.

Above onComplete pattern works just fine for 2 actors, but what if we have like 10 ?

data1ObjFuture.onComplete{
data2ObjFuture.onComplete{
data3ObjFuture.onComplete{
...

As you can see this become super nested very fast.

Good news is Scala offer a syntactic suger called “for comprehensions”:

val data1Actor = ... //create actor#1
val data2Actor = ... //create actor#2

val result = for {
data1Obj: Data1Obj <- (data1Actor ? GetData1()).mapTo[Data1Obj]
data2Obj <- data2Actor ? GetData2(data1Obj)
} yield data2Obj

So the for … yield above is a syntactic suger for this

futureData1Obj.flatMap(r1 => futureData2Obj.map(r2 => ... ) )

I have to admit, it does make reading easier.

Above example, data2Actor depends response of data1Actor to work. What if it doesn’t ? Is it possible to run these 2 in parallel ?

Yes.

Parallel

Let’s try the following:

val data1Actor = ...
val data3Actor = ...
val dataMergeActor = ...

val report = for {
data1ObjResult <- (data1Actor ? GetData1()).mapTo[Data1Obj]
data3ObjResult <- (data3Actor ? GetData3()).mapTo[Data3Obj]
} yield dataMergeActor ? GetMergedData(data1ObjResult, data3ObjResult)

so data1Actor and data3Actor doesn’t care about what each other returns, will above run in pararell ?

In below example, let’s allow actor to sleep for some time before return.

If data1Actor sleep a little bit longer than data3Actor, the later one will have the chance to come back earlier:

//Data3Actor.scala
class Data3Actor(..., sleepTime: Option[Int]) extends Actor {
...

def receive = {
case _: GetData3 =>
...
sleepTime match {
case Some(time) =>
sleep(time)
case None => ()
}
...
val res:Data3Obj = new Data3Obj(count_Data3)
sender ! res
run it
nope

Looks like data3Actor waits for data1Actor to finish before doing anything, but why ?

Well the for … yield is not black magic, it is a syntatic suger, so above will translate into a nested .mapTo, hence it’s still sequential.

Then what to do ?

Future in scala by itself is concurrent, no black magic needed. All we need is get the returned future object first, then resolve them in for … yield.

val data1Actor = ...
val data3Actor = ...
val dataMergeActor = ...
.
val data1ObjFuture: Future[Data1Obj] = (data1Actor ? GetData1()).mapTo[Data1Obj]
val data3ObjFuture: Future[Data3Obj] = (data3Actor ? GetData3()).mapTo[Data3Obj]

val report = for {
data1ObjResult <- data1ObjFuture
data3ObjResult <- data3ObjFuture
} yield dataMergeActor ? GetMergedData(data1ObjResult, data3ObjResult)
Shall we test it out ?

Hey, data3Actor did come back sooner !


I’m also exploring another way to assemble actors’ response, it looks like a state machine. Will probably create another post later.