Streaming large data sets

Michał Bogacz
6 min readJun 20, 2017

--

Building REST microservices requires dealing with many restrictions. One of these restrictions is a limited memory. But don’t make the false assumption that you can’t pass data sets bigger than your microservice memory. With Akka HTTP and Akka Streams, it’s not only efficient but also simple. In this blog post, I will show you how to do that.

Introduction

Let’s start from the beginning. When you build REST microservice, one of the most important endpoints is method GET on a group of resources. Its purpose is to expose resources gathered under given path.

Example: “GET” method on endpoint “/books” — returns all books.

At first sight, there is nothing special about it. Just returning a list of resources. But what happens if this list contains many results? So many, that getting them into memory can harm your microservice stability?

Let’s consider two different solutions.

Note: Code samples for this blog post can be found on github.

Solution 1: Client controls the size — Pagination

The first approach is simple: user knows best what size the response microservice should return. How can he achieve that? By defining in the request, the number of maximum resources in the response, and the number of resources to be skipped from the beginning of the response. This way, the user is splitting resource list into many requests. And can get them one after another.

This concept is called pagination.

The benefit of this approach is clear: strict control of returned payload size. That can eliminate possible memory issues for the client and (with proper validation of pagination parameters) can eliminate memory problems in microservice too.

Now let’s see how this can be implemented in Scala.

Case class for Pagination parameters:

case class PageParams(pageNumber: Option[Int], 
pageSize: Option[Int]) {
require(pageNumber.forall(size ⇒ size > 0), "page number must be greater then 0")
require(pageSize.forall(size ⇒ size > 0 && size < 1000), "page size must be greater then 0 and less then 1000")

private val DefaultPageNumber = 1
private val DefaultPageSize = 16

val limit: Int = pageSize.getOrElse(DefaultPageSize)
val skip: Int = (pageNumber.getOrElse(DefaultPageNumber) - 1) * limit
}

This is one of possible ways to implement pagination parameters.

Code presented above defines two optional fields:

  • pageSize — the number of elements returned in one response/page
  • pageNumber — the number of a page to be returned

As you can see above, we are not skipping single resources, but groups of resources called “pages”. When pagination parameters are not given, default first page will be used (with 16 elements). Fields “skip” and “limit” will be used later.

To get all resources from the endpoint, the user specifies one pageSize and invokes GET requests with incrementing pageNumber until an empty response occurs.

The directive to use this case class is shown below:

trait ServiceDirectives extends Directives {

val pageParams: Directive1[PageParams] =
parameters(('pageNumber.as[Int].?,
'pageSize.as[Int].?)).as(PageParams)
}

And the routing:

val route =
path("resources") {
pageParams { pageParams =>
get {
complete(getData(coll, pageParams)
.map(HttpEntity(ContentTypes.`application/json`, _)))
}
}
}

At last the most important function:

def getData(coll: MongoCollection[Document], pageParams:  
PageParams): Future[String] =
Source.fromPublisher(coll.find().skip(pageParams.skip)
.limit(pageParams.limit))
.map(_.toJson)
.intersperse("[", ",", "]")
.runFold("")((acc, e) ⇒ acc + e)

In the code above I’m using “mongodb-driver-reactivestreams” driver. It exposes Reactive Streams Publisher for many different operations.

To get Publisher with documents from MongoDB collection, I’m using “coll.find().skip(pageParams.skip).limit(pageParams.limit)” (now you see why I needed “skip” and “limit” fields in “PageParams”).

Constructor “Source.fromPublisher” changes Publisher to Akka Streams Source. I used “.map(_.toJson)” to simplify marshaling, as it changes documents to Strings (Strings with MongoDB Extended JSON format).

The last two steps are “intersperse(“[“, “,”, “]”)” and “.runFold(“”)((acc, e) ⇒ acc + e)”. The first step creates correct JSON List response by adding start, end and middle elements. The second one concatenates all results to one String.

Note: In my example, I used MongoDB as a database. But this solution is not limited to MongoDB. Actually, it’s very simple to change the storage. You just need the database driver which exposes Reactive Streams Publisher. For example Slick API has a Publisher too: http://slick.lightbend.com/doc/3.2.0/dbio.html#streaming

After running the code from example (in GitHub run object “ApiReturningList”) and making a GET request to “localhost:8080/resources” you get response with a body similar to the below example:

[
{
"_id": {
"$oid": "593fc5f3964652a1ffc9836d"
}
},
{
"_id": {
"$oid": "593fc5f4964652a1ffc9836e"
}
}
]

Of course, the response depends of your MongoDB collection content.

It’s pretty simple, right? But unfortunately, this solution has few drawbacks. And for one of our users, those were blockers.

Different requirements

As I mentioned, services that I develop with my team have many REST API users. As you can guess, different users have potentially different use cases. Pagination works well for most of them, but not for all. It turned out that for one user the service responses were slow and sometimes timeouts could occur.

My team investigated it and found that:

  • His use case was cache synchronization and he wanted to iterate over all data
  • User had a lot of data, more than 200 000 JSON documents
  • We encountered timeouts when the database operation took too long

But this still does not explain why timeouts occurred, right? Well, not exactly. MongoDB skip and limit are not suitable for pagination over large data sets. In MongoDB documentation, you can read “The cursor.skip() method is often expensive because it requires the server to walk from the beginning of the collection or index to get the offset or skip position before beginning to return results.” [link]. This means that responses for the first pages were fast, but for the last pages, were so slow, that timeouts could occur. And that is definitely bad.

We needed a better solution. The solution that can pass all data through our service, without memory allocation peaks and easy for us and for our current API users.

And we found it: HTTP Chunked Encoding!

Solution 2: Chunking

The idea is simple: the data is sent in response as a series of “chunks”. Basically, elements can be sent one by one, without storing the whole response in memory and without closing connection until every chunk is passed. This way you can send unlimited size of data in just one request!

It’s not new idea, it was introduced in HTTP 1.1 specification. To inform HTTP client about chunked encoding response, the “Transfer-Encoding: chunked” header is added. And best of all — it’s really simple to use with Akka HTTP and Akka Streams!

Let’s see the examples. First, we create new getData function:

def getData(coll: MongoCollection[Document]): Source[ByteString, NotUsed] =
Source.fromPublisher(coll.find())
.map(_.toJson)
.map(ByteString(_))
.intersperse(ByteString("["), ByteString(","), ByteString("]"))

Differences from first solution:

  1. String is changed to ByteString.
  2. Returned type is Source[ByteString, NotUsed] — we are not folding the results
  3. “skip” and “limit” is removed (this is not necessary in this example, but still can be included if you need)

Now let’s check the route:

val route =
path("resources") {
get {
complete(HttpEntity(ContentTypes.`application/json`,
getData(coll)))
}
}

In the above code sample, the response is completed with HttpEntity and ByteString Source. You can try it out by running “AppReturningChunking” from my GitHub examples.

The response will be the same but contains additional header “Transfer-Encoding: chunked”.

Using HTTP chunking can give you many benefits:

  • better memory management (we are not folding results in memory, just passing them through)
  • the pagination is still possible! (we can add “skip” and “limit” if needed)
  • we are returning whole MongoDB collection with one database cursor, which is faster and more consistent than pagination
  • speed: depending on JSON size and page size, chunking can speed up getting all data even few times!

One more advice: In your production application don’t forget to adjust your timeouts. Even if chunking is much faster than the pagination from the Solution 1, there still can be a user trying to get gigabytes of data.

Summary

I didn’t present all implications of using chunking, such as:

  • Can chunking gigabytes of data result in user’s service memory overflow? (short answer: not necessarily)
  • In REST, the status is sent at the begging of the response, what about errors that occur after response sending started?
  • Is it possible to not only get but also send data to the service in a similar way?

Answers, to these questions, would make this blog post too long. But if you are interested I can write more posts about chunking.

From my experience, chunking is worth to implement when you are dealing with large data sets. In my team, we added chunking even to endpoints with pagination. This really improved memory consumption in our microservices (and users didn’t even see the difference — all responses are backward compatible).

My advice for developers: explore different possibilities and features! Technologies like Akka Streams, Akka Http, and Reactive Streams clients to a database can create a powerful and elastic combination. Don’t take for granted that all you did is enough and users have to deal with all your API limitations. Sometimes it doesn’t take much work, to make your users happy.

--

--

Michał Bogacz

Senior Software Developer at VirtusLab. Scala and Akka enthusiast.