Playing with Akka Stream and chunks of bytes

Facundo Viale
3 min readJun 3, 2015

--

I’m a very big fan of the electronics. So, a couple of months ago, I started a project in which I needed to develop a program that could process commands from an IO device.

After some time researching and playing with the device in question, I could describe the situation like this:

  • The device uses a serial bus and sends me a sequence of commands.
  • Because this is a low level device, the commands are packages of bytes of differents sizes.
  • Like most of the low level IO devices, this has a buffer. The buffer holds the output bytes from the device and when it’s full it sends all the content to the computer.

In the end I have a chunked stream of bytes that contains the sequence of commands, like this:

Each cell represents a byte and each row a chunk

After this introduction, here comes the interesting part. Since I’m a Scala developer, I took the challenge of writing my “commands processor” in Scala. For this purpose I chose to use flow, which is a Reactive serial communication library for Akka and Scala. I initially try to implement the reading of the commands by using play-iteratee. But after some coding I didn’t feel that it was the best tool for this. So, I searched for other options and I ended up looking at the Akka Stream library.

First, I needed an actor that could receive the states and data messages from flow and was able to be the source of the stream. To make this integration I only needed to make this actor inherit from the trait ActorPublisher. Like this:

class SerialSourceActor(port: String, settings: SerialSettings) extends ActorPublisher[ByteString] with ActorLogging with Stash {

override def preStart {
self ! Start
}
override def postStop {
self ! Stop
}
def receive = {
case Start =>
IO(Serial) ! Serial.Open(port, settings)
context.become(initializing())
case _ => stash()
}
def initializing(startTime: Long = System.currentTimeMillis): Receive = {
case Serial.CommandFailed(cmd: Serial.Open, reason) =>
onError(new IOException(s”Connection failed, stopping terminal. Reason: ${reason}”))
context stop self
case Serial.Opened(port) =>
log.debug(s”””Serial port “$port” opened in ${System.currentTimeMillis — startTime}ms”””)
context.become(initialized(sender))
context watch sender
unstashAll()
case _ => stash()
}
def initialized(operator: ActorRef): Receive = {
case Serial.Received(data) =>
onNext(data)
case Serial.Closed =>
log.info(“Operator closed normally, exiting terminal.”)
onComplete()
context stop self
case Stop =>
onComplete()
operator ! Serial.Close
case Terminated(`operator`) =>
onError(new IOException(s”Operator crashed unexpectedly.”))
context stop self
case other =>
onError(new IOException(s”Unexpected:” + other))
context stop self
}
}object SerialSourceActor {
def props(port: String, settings: SerialSettings) = Props(new SerialSourceActor(port, settings))
case object Start
case object Stop
}

Now I need to define the logic to extract the commands from the chunk of bytes. Luckily, Akka Stream has PushPullStage, which is a type of stream transformation that works like this:

Long story short, with this I can control the flow in a way that I can accumulate the incoming data and decide when I want to push to the downstream. Here is what i did:

class CommandEmitter extends PushPullStage[ByteString, CommandResponse] {  private var buffer = ByteString.empty  override def onPush(elem: ByteString, ctx: Context[CommandResponse]): SyncDirective = {    buffer ++= elem    extractor(buffer) match {
case (Some(cmd), remaining) =>
buffer = remaining
ctx.push(cmd)
case (None, _) =>
ctx.pull()
}
}
override def onPull(ctx: Context[CommandResponse]): SyncDirective =
ctx.pull()
private def extractor(buffer: ByteString): (Option[CommandResponse], ByteString) = ...}object CommandEmitter {
val flow = Flow[ByteString].transform(() => new CommandEmitter())
}

Now it’s time to put all together by simply defining the stream using the ActorFlowMaterializer to materialize the flow:

implicit val materializer = ActorFlowMaterializer()Source.actorPublisher(SerialSourceActor.props(port, settings)).via(CommandEmitter.flow).runWith(Sink.foreach(println))

It’s a piece of cake. ☺

--

--