Unmarshalling in Akka-HTTP: caveats of streaming

Herve Eichwald
disney-streaming
Published in
7 min readApr 9, 2019

Abstract

The default streaming nature of Akka-HTTP may lead to some complications when unmarshalling (deserializing) a request, especially when there would be a need to unmarshall the request entity multiple times. In addition, some problems may only appear for large payloads and as a result, may be easy to miss while doing common testing on your local machine. In this post, we will unearth some of these potential issues, and provide you with a handful of strategies so that you can go to production more serenely.

Streaming by nature

Akka-HTTP streams requests and responses entities. This behavior is efficient, as it allows for back-pressure mechanisms and more efficient use of resources such as RAM by avoiding buffering the entity fully in memory. It helps us in situations like:

  • the server is unable to make immediate use of the entity because of heavy load
  • asserting from the start of the stream that we are not interested in this request
  • protecting us from excessively large requests

A possible source of headaches

While powerful, streaming per design may also hide issues which do not always manifest obviously.

Consider the following scenario: supporting multiple unmarshallers for a given HTTP endpoint. While not ideal, sometimes you may need to support legacy applications in a less than perfect ecosystem, and in this scenario you are required to support both JSON payloads and stringified JSON in the same endpoint using the same content-type.

Debugging may be difficult

Streaming the request entity means that it is only available to you once. As a result, if unmarshalling fails and you would like to have access to the request entity again, perhaps to unmarshall it in a more user friendly way such as a String, you may be out of luck.

Testing an example

We are setting up a simple akka-HTTP project and using a JSON deserialization library. We choose circe as it is a popular choice, but feel free to pick your preferred library.

build.sbt

name := "akka-http-unmarshallers"

version := "0.1"

scalaVersion := "2.12.8"

libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-http" % "10.1.7",
"com.typesafe.akka" %% "akka-stream" % "2.5.19",
"io.circe" %% "circe-core" % "0.11.1",
"io.circe" %% "circe-generic" % "0.11.1",
"io.circe" %% "circe-parser" % "0.11.1",
"de.heikoseeberger" %% "akka-http-circe" % "1.25.2"
)

Our expected payload is a JSON array, so let’s first define a server and deserialize the request into List[Json]

package example

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import de.heikoseeberger.akkahttpcirce.FailFastCirceSupport._
import io.circe._

import scala.concurrent.ExecutionContext
import scala.io.StdIn
object WebServer {

def main(args: Array[String]) {

implicit val system: ActorSystem = ActorSystem("my-system")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executionContext: ExecutionContext = system.dispatcher

val route =
path("test") {
post {
entity(as[List[Json]]) { _ =>
complete("Ok")
}
}
}

val bindingFuture = Http().bindAndHandle(route, "localhost", 8080)

println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
StdIn.readLine() // let it run until user presses return
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
}
}

Now, start the server with sbt run and send a request to validate that everything is fine.

curl -v POST \
http://localhost:8080/test \
-H 'Content-Type: application/json' \
-H 'cache-control: no-cache' \
-d '[{
"field1": "value1"
}]'

You should get a 200. Now let’s try sending a String

curl -v POST \
http://localhost:8080/test \
-H 'Content-Type: application/json' \
-H 'cache-control: no-cache' \
-d '"[{\"field1\": \"value1\"}]"'

We get a 400: The request was malformed

Let’s see if we can use this previous unmarshaller forList[Json] and also a String unmarshaller. To achieve this functionality, let’s write a small function to try multiple unmarshallers until we find a working one.

// Inspired by `firstOf` in
// akka.http.scaladsl.unmarshalling.Unmarshaller
private def firstOf[A, B](unmarshallers: Unmarshaller[A, B]*)(implicit materializer: ActorMaterializer): Unmarshaller[A, B] =
Unmarshaller { implicit ec => a =>
def rec(ix: Int): Future[B] =
if (ix < unmarshallers.size) {
unmarshallers(ix)(a).fast.recoverWith {
case NonFatal(e) =>
e.printStackTrace()
rec(ix + 1)
}
} else {
FastFuture.failed(new Exception("Cannot unmarshall"))
}
rec(0)
}

Let’s write a simple method to parse the String into a List[Json]

private def strAsListJson(s: String): List[Json] =
// For simplicity of this example, we use `.get` instead of proper handling
parse(s).toOption.get.asArray.get.toList

Update the route:

val route =
path("test") {
post {
entity(
firstOf(
as[List[Json]],
as[String].map(strAsListJson)
)
) { _ =>
complete("Ok")
}
}
}

Resend the String and it should just work.

Easy enough, right?

You may think that you are done, but there are some caveats with this solution. To expose another behavior, either send a large (around 500KB — 1MB) stringified JSON in the payload or send chunks to stream data in multiple pieces.

Sending a large payload

curl -v POST http://localhost:8080/test \
-H 'Content-Type: application/json' \
-H 'cache-control: no-cache' \
-d @large_string.json
> Expect: 100-continue
>
< HTTP/1.1 100 Continue
< Server: akka-http/10.1.7
< Date: Mon, 11 Mar 2019 15:24:43 GMT
* We are completely uploaded and fine
< HTTP/1.1 100 Continue
< Server: akka-http/10.1.7
< Date: Mon, 11 Mar 2019 15:24:43 GMT
< HTTP/1.1 100 Continue
< Server: akka-http/10.1.7
< Date: Mon, 11 Mar 2019 15:24:43 GMT
< HTTP/1.1 400 Bad Request
< Server: akka-http/10.1.7
< Date: Mon, 11 Mar 2019 15:24:43 GMT
< Content-Type: text/plain; charset=UTF-8
< Content-Length: 52
<
The request content was malformed:

The same is observed when using chunks and a small payload

curl -v POST http://localhost:8080/test \ 
-H 'Content-Type: application/json' \
-H 'Transfer-Encoding: chunked' \
-H 'cache-control: no-cache' \
-d @small_string.json
> 22f
* upload completely sent off: 566 out of 559 bytes
< HTTP/1.1 400 Bad Request
< Server: akka-http/10.1.7
< Date: Mon, 11 Mar 2019 15:26:21 GMT
< Content-Type: text/plain; charset=UTF-8
< Content-Length: 52
<
The request content was malformed:

This happens for our second unmarshaller, namely the String unmarshaller.

Sending a real large JSON — the first unmarshaller- does not exhibit any problems.

curl -v POST \
http://localhost:8080/test \
-H 'Content-Type: application/json' \
-H 'cache-control: no-cache' \
-d @large_json.json
orcurl -v POST \
http://localhost:8080/test \
-H 'Content-Type: application/json' \
-H 'Transfer-Encoding: chunked' \
-H 'cache-control: no-cache' \
-d @large_json.json
> Expect: 100-continue
>
< HTTP/1.1 100 Continue
< Server: akka-http/10.1.7
< Date: Mon, 11 Mar 2019 15:29:19 GMT
* We are completely uploaded and fine
< HTTP/1.1 200 OK
< Server: akka-http/10.1.7
< Date: Mon, 11 Mar 2019 15:29:19 GMT
< Content-Type: application/json
< Content-Length: 4

You may also test reversing the order of the unmarshallers. Notice that this time, sending the large String would work and sending the large JSON would fail.

firstOf(
as[String].map(strAsListJson),
as[List[Json]]
)

This is a result of Akka-HTTP using streaming as a default. Looking at the server, we notice:

java.lang.IllegalStateException: Substream Source cannot be materialized more than once
at akka.stream.impl.fusing.SubSource$$anon$13.setCB(StreamOfStreams.scala:756)
at akka.stream.impl.fusing.SubSource$$anon$13.preStart(StreamOfStreams.scala:766)

The stream is not available anymore for the second unmarshaller.

Debugging

It may also be a problem when trying to understand what the client sends us. Indeed, if the JSON unmarshaller were to fail, you may be tempted to unmarshall the request as a String , the as default .toString would give a truncated body and do something like:

Unmarshaller.stringUnmarshaller(requestEntity)
.map(s => println(s"Cannot unmarshall: $s"))
.recover { case NonFatal(e) => e.printStackTrace() }

Unfortunately, this would fail with the same error Substream Source cannot be materialized more than once as you cannot unmarshall twice.

Possible solutions

Buffering

One way to avoid the issue is to buffer the entire request entity in memory, thus making it available for reuse in multiple unmarshallers. This is simply achieved by wrapping the route with toStrictEntity

val route =
path("test") {
toStrictEntity(5.seconds) {
post {
entity(
firstOf(
as[List[Json]],
as[String].map(strAsListJson)
)
) { _ =>
complete("Ok")
}
}
}
}

As mentioned in the Akka documentation, you may still apply configuration to limit or not the size of the allowed payload.

A custom single unmarshaller

Another approach is to avoid the use of multiple unmarshallers and write a custom one. This has the benefit of consuming the stream only once but is more tedious to write; indeed it would be implemented at a lower level and you would have to inspect the bytes. An incomplete starting idea may be:

implicit val u: Unmarshaller[HttpEntity, List[Json]] =
Unmarshaller
.byteStringUnmarshaller
.map {
case ByteString.empty => throw Unmarshaller.NoContentException
case a =>
a.head match {
// 34 is ASCII for "
case 34 =>
// 92 is ASCII for \
// Remove first " and last "
// Naive unescaping
strAsListJson(a.filter(_ != 92 ).tail.dropRight(1).utf8String)
// 91 is ASCII for [
case 91 => jawn.parseByteBuffer(a.asByteBuffer).fold(throw _, _.asArray.get.toList)
}

JSON specific

The last solution — specific to this use case List[Json]- may actually be the simplest and is shown here; keep in mind though that it is specific to our example and this combination of unmarshallers List[Json] and String.

val route =
path("test") {
post {
entity(as[Json]
) { j =>
j.asArray match {
case None =>
j.asString match {
case Some(s) =>
(for {
s1 <- parse(s).toOption
s2 <- s1.asArray
} yield s2) match {
case Some(_) => complete("Ok")
case _ => complete(400, "Not Ok: payload string is not a json array")
}
case None => complete(400, "Not Ok: payload is not a valid json string")
}
case Some(_) => complete("Ok")
}
}
}
}

Conclusion

It is always essential to understand the implicit agreements that may happen under the hood when you use a specific piece of software being Akka-HTTP or another.

Problems may manifest only in race conditions, or on rare occasions, and debugging will be always more challenging and stressful when it has to be done on production systems.

Issues which may appear as bugs at first may actually be features of the software you are using and designed in that way to favor performance or other concerns. Therefore, always test the application thoroughly in common and corner cases and read the documentation — always a good starting point to understand the design and motivations of the software.

--

--