Playing with Akka Stream and chunks of bytes
I’m a very big fan of the electronics. So, a couple of months ago, I started a project in which I needed to develop a program that could process commands from an IO device.
After some time researching and playing with the device in question, I could describe the situation like this:
- The device uses a serial bus and sends me a sequence of commands.
- Because this is a low level device, the commands are packages of bytes of differents sizes.
- Like most of the low level IO devices, this has a buffer. The buffer holds the output bytes from the device and when it’s full it sends all the content to the computer.
In the end I have a chunked stream of bytes that contains the sequence of commands, like this:
After this introduction, here comes the interesting part. Since I’m a Scala developer, I took the challenge of writing my “commands processor” in Scala. For this purpose I chose to use flow, which is a Reactive serial communication library for Akka and Scala. I initially try to implement the reading of the commands by using play-iteratee. But after some coding I didn’t feel that it was the best tool for this. So, I searched for other options and I ended up looking at the Akka Stream library.
First, I needed an actor that could receive the states and data messages from flow and was able to be the source of the stream. To make this integration I only needed to make this actor inherit from the trait ActorPublisher. Like this:
class SerialSourceActor(port: String, settings: SerialSettings) extends ActorPublisher[ByteString] with ActorLogging with Stash {
override def preStart {
self ! Start
} override def postStop {
self ! Stop
} def receive = {
case Start =>
IO(Serial) ! Serial.Open(port, settings)
context.become(initializing())
case _ => stash()
} def initializing(startTime: Long = System.currentTimeMillis): Receive = {
case Serial.CommandFailed(cmd: Serial.Open, reason) =>
onError(new IOException(s”Connection failed, stopping terminal. Reason: ${reason}”))
context stop self case Serial.Opened(port) =>
log.debug(s”””Serial port “$port” opened in ${System.currentTimeMillis — startTime}ms”””)
context.become(initialized(sender))
context watch sender
unstashAll() case _ => stash()
} def initialized(operator: ActorRef): Receive = {
case Serial.Received(data) =>
onNext(data) case Serial.Closed =>
log.info(“Operator closed normally, exiting terminal.”)
onComplete()
context stop self case Stop =>
onComplete()
operator ! Serial.Close case Terminated(`operator`) =>
onError(new IOException(s”Operator crashed unexpectedly.”))
context stop self case other =>
onError(new IOException(s”Unexpected:” + other))
context stop self
}}object SerialSourceActor {
def props(port: String, settings: SerialSettings) = Props(new SerialSourceActor(port, settings))
case object Start
case object Stop
}
Now I need to define the logic to extract the commands from the chunk of bytes. Luckily, Akka Stream has PushPullStage, which is a type of stream transformation that works like this:
Long story short, with this I can control the flow in a way that I can accumulate the incoming data and decide when I want to push to the downstream. Here is what i did:
class CommandEmitter extends PushPullStage[ByteString, CommandResponse] { private var buffer = ByteString.empty override def onPush(elem: ByteString, ctx: Context[CommandResponse]): SyncDirective = { buffer ++= elem extractor(buffer) match {
case (Some(cmd), remaining) =>
buffer = remaining
ctx.push(cmd) case (None, _) =>
ctx.pull()
}
} override def onPull(ctx: Context[CommandResponse]): SyncDirective =
ctx.pull() private def extractor(buffer: ByteString): (Option[CommandResponse], ByteString) = ...}object CommandEmitter {
val flow = Flow[ByteString].transform(() => new CommandEmitter())
}
Now it’s time to put all together by simply defining the stream using the ActorFlowMaterializer to materialize the flow:
implicit val materializer = ActorFlowMaterializer()Source.actorPublisher(SerialSourceActor.props(port, settings)).via(CommandEmitter.flow).runWith(Sink.foreach(println))
It’s a piece of cake. ☺