Streaming data from PostgreSQL using Akka Streams, Akka Http and Doobie

This article explains my 2 days journey on building a streaming application using akka framework’s akka-streams, postgres database and typelevels’s awesome JDBC library called Doobie.

It all started with my favorite PHP framework which I used to build an admin backend interface for an online application. The client requested the application to have a functionality for exporting a large sets of personnel data into csv format for offline evaluation/analysis. Firstly, been a lazy developer, I went straight using a php excel library, it was pretty straight forward to work with, but in no time I had challenges(majorly with memory). These challenges were not quite easy to resolve owning to the high volume of data to be exported and the complexity of the queries which could not be implemented in a single SQL select statement.

I swiftly moved my implementation to my preferred tools for daily programming (akka framework). I started with an awesome scala-csv library by @tototoshi. It serves it purpose quite okey, but the time lap the application took to move this large data set from postgres database into a temporary file on the server and subsequently, the time it took for the application to download the temporary file through the user browser was not acceptable nor was it practical for production.

Finally, I opt to stream these records directly from postgres database to the user browser based on a few search criteria select by the end user(the client).

My Implementation

The process is going to be pretty straightforward in terms of this implementation where data is read from 2 tables as streams which will be a downloadable file on the user browser.

Logical diagram of akka-streams implementation

A few dependencies to add to the project build.sbt are as follows:

lazy val akkaHttpVersion = "10.0.11"
lazy val akkaVersion = "2.5.11"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-http" % akkaHttpVersion,
"com.typesafe.akka" %% "akka-http-spray-json" % akkaHttpVersion,
"com.typesafe.akka" %% "akka-http-xml" % akkaHttpVersion,
"com.typesafe.akka" %% "akka-stream" % akkaVersion,
"org.tpolecat" %% "doobie-postgres" % "0.5.3"
)

Lets start by defining our case case models for the postgres database tables

case class Applicant(
id: Int,
first_name: String,
surname: String,
other_names: Option[String]
position: String){
 def parse(): List[String] = List(
first_name,
surname,
other_names.getOrElse(""),
position)
}
case class Hobbies(
applicant_id: String,
hobby: String)

Notice that we added a parse method in the applicant case class, this is necessary to ease conversion of database row to a list of strings. The relationship between the applicant table and that of the hobbies is one to many.

As earlier stated, I will be using Doobie for database connection. Doobie is a dead simple pure functional JDBC layer for Scala and Cats. It is not an ORM, nor is it a relational algebra; it simply provides a principled way to construct programs (and higher-level libraries) that use JDBC. To learn more about how to compose queries with Doobie, kindly visit the website. Now lets write our doobie query to fetch these records.

def getApplicantList(): IO[List[Applicant]] = {
sql"""select * from applicants
""".query[List[Applicant]]
.stream
.compile.toList
.transact(xa)
}
def getApplicantWithHobbies(id: Int): IO[List[Hobbies]] = {
val query = sql"""select * from hobbies where applicant_id=$id
""".query[List[Hobbies]].
.stream
.compile
.toList
.transact(xa)
}

Next, is to implement the code for akka-streams(Source and flow ) and that of akka-http route.

val hobbiesFlow: Flow[Applicant, List[String], NotUsed] = Flow[Applicant].map { applicant =>
  getApplicantWithHobbies(applicant.id).unsafeRunSync() match {
case x:List[Hobbies] => applicant.parse :+ x.map(_.hobby).mkString(",")

case _ => applicant.parse
}
val source = Source(getApplicantList.unsafeRunSync())
.via(hobbiesFlow)
.map(x => ByteString(x.mkString(",") + "\n"))

As you can see, we implemented an akka source using our getApplicantList method and a single flow stage which iteratively obtains the hobbies of each applicant, merge it to have a single list of strings and finally convert the list to UTF-8 encoding using ByteString method.

Lastly, we set some http custom headers in our akka-http route to enable the end user browser to download the streams of records generated.

val headers = List(RawHeader("Content-Disposition", s"attachment; filename=applicant.csv"))

respondWithHeaders(headers){
complete(HttpEntity(ContentTypes.`text/csv(UTF-8)`, source))
}

Summary

The complete akka-http route looks some what like this:

pathPrefix("applicant-download"){ 
get {
    val source = Source(getApplicantList.unsafeRunSync())
.via(hobbiesFlow)
.map(x => ByteString(x.mkString(",") + "\n"))
    val headers = List(RawHeader("Content-Disposition",  s"attachment; filename=applicant.csv"))

respondWithHeaders(headers){
complete(HttpEntity(ContentTypes.`text/csv(UTF-8)`, source))
}
}
}

With these few lines of code I was able to stream a few gigabytes of records to the client without been a red-neck developer :)

Big thanks to @Hungai for taking the time to review this post.

Feel free to reach out to me if you need more explanation on this.