Enriching Large Data Sets with Akka Streams

IAS Tech Blog
IAS Tech Blog
Published in
5 min readSep 30, 2021

--

by Joshua Yosen

If you’re tired of the long processing times on your not-so-fault-tolerant streaming solution, you know that streaming large data sets can be a headache. At IAS, we’ve found that Akka Streams simplifies the process, offering faster streams and greater system stability.

The Situation

Over the past year, we migrated our on-premise solutions to the Cloud. One of those solutions is a task that classifies video channels based on information such as the title and description. We use an internal system responsible for that text classification using an HTTP endpoint that accepts JSON text as its input. The JSON text comes from batches of files which then categorizes the video based on the text. For example, let’s say there’s a Linus Tech Tips video with associated textual information:

Request Text: “Linus Tech Tips … TDP … AMD … Ryzen … Intel … CPU … 7nm … desktop … Cinebench … tech-blogger”

These exist as JSON objects. We send these JSON objects to our text classifier and receive a response message with classified categories such as:

Response: “categories”: [“Technology & Computing”, “Hobbies & Interests”]

A single streaming solution can send 1.5 million video channels (1.5 million requests) a day, with an average response time of 14 seconds. If the response time exceeds one minute, it times out and retries up to three times. There is also a limitation of 200 open connections at once.

This system is not elastic and does not scale with an increase in the open number of connections. Some forms of text can crash the semantic engine and affect cluster performance, making the system unreliable.

On-premise, we run this task as a PySpark Job. It does a handful of simple transformations although the actual sending of the HTTP requests is the most sophisticated and time consuming step of the process.

The Problem

Unfortunately, our on-premise solution is not effective when running in the Cloud. On-premise uses 150 executors to run this job with a possible maximum execution time of 55 hours. Most of this time is spent by the Spark executors waiting for the HTTP response. With an average response time of 14 seconds, the Spark Cluster can waste CPU cycles and increase costs.

AWS is our Cloud platform where we use transient EMR clusters, which are charged by the hour. With a lift-and-shift migration, this process could be very costly with a maximum possible runtime of 55 hours.

The Solution: Akka To the Rescue!

Before delving into Akka’s solution to this problem, it’s essential to understand Akka Streams and Reactive Streams. In traditional streaming situations (blue arrows), there’s the Producer, the Consumer, and a series of processing steps. The Producer sends data through the line and eventually the Consumer consumes that data. However, it is not uncommon for the Consumer to be unable to handle the amount of data that a Producer sends at a time. That’s where Reactive Streams come in! A Reactive Stream uses a concept called Backpressure to communicate to the Producer that it can only handle so many requests at once. This way, the Consumer controls the flow. This is imaged below:

Akka Streams is an implementation of Reactive Streaming using runnable graphs. It allows streaming with asynchronous and non-blocking code. Akka Streams is available in Scala or Java, and for our purposes we wrote the solution in Scala. Below is the stack we implemented:

Below is the code used to create the Graph. It can be visualized as a Source, a Flow, and a Sink:

// Source

val fut: Future[IOResult] = Source

.fromIterator(() => args.map(Path.of(_)).iterator)

// construct iterator of paths

.flatMapConcat { path =>

FileIO

.fromPath(path)

.via(Framing.delimiter(ByteString(“\n”), maximumFrameLength = inputMaxLength))

.map(_.utf8String)

}

// Flow

.map(x => AdmantxInput.from(x))

.collect{case Success(v) => v}

.filter(ai => supportedLanguages.contains(ai.lang))

.mapAsyncUnordered(parallelism = parallelism)(sendAdmantxRequest)

.map(x => ByteString(s”${x.channelId},${x.statusCode},${x.response}\n”))

// Sink

.runWith(FileIO.toPath(Path.of(“advmantx-output.txt”)))

When working on the proof of concept for this solution, we ran it on a test machine with only 2 Cores and 8 GB of RAM. The performance was on par with our PySpark on-premise solution. We observed less than 2% failure rate without creating any explicit queues or buffers, due to Akka Stream’s Backpressure.

We configured the stream as:

  • Fixed size thread-pool (64) for HTTP requests
  • ~ 100 Total Threads
  • Only 5% of CPU and Memory Usage in `top`
  • Thread pool could be optimized even further, but the CPU and Memory Storage are non-issues

Migrating to the Cloud

We dockerized our application, uploaded its image to AWS Elastic Container Registry (ECR), and deployed it using AWS Elastic Container Service (ECS). ECS is a container orchestration service, and a Kubernetes competitor. Allowing ECS to manage the cluster and container orchestration made the deployment as easy as pointing ECS to our ECR image.

ECS supports 2 launch types:

  • EC2 Launch Type: You can host your tasks on a cluster of Amazon EC2 instances that you manage
  • Fargate Launch Type: You can host your containers on a serverless infrastructure that is managed by Amazon ECS

For us, Fargate was the better choice as it meant we didn’t have to manage hosting EC2 instances. However, Fargate has a disk size limitation of 20 GB (as of version 1.4.0). Our needs required more than this, and in order to increase this limit, we enabled gzip encryption for both input and output data. This turned out to be quite easy with Akka Streams.

Performance Tests

Given this Cloud implementation using Akka Streams and our on-premise PySpark method we ran it against the same dataset on Execution Time and Failure Rate:

In both metrics, Akka Streams was superior. It was approximately 4.5 hours faster and had a significantly lower rate of failure, making our system more stable. We found that:

  • Using Akka Streams, we saved time and reduced costs by replacing our Spark Cluster with just one Docker Container.
  • Akka Streams is very beginner friendly. The documentation is clear and comprehensive, with little upstart time.
  • Akka Streams is an easy-to-use Reactive Streams framework. Check out this article on common pitfalls to learn what can go wrong.
  • The Reactive Streams architecture is widely used.
  • Async & non-blocking code provided performance and resource improvements vital to process 1.5 million daily requests.
  • Using Akka Streams, we saved time and money by replacing our Spark Cluster with just one Docker Container.

--

--