Photo by Mike Benna on Unsplash

Akka Streams by example: processing files

Marc Rooding
inganalytics.com/inganalytics
9 min readJun 29, 2020

--

There are different techniques to learn a new craft or skill. Some people prefer reading, others prefer to watch videos, and there are those, like me, that just need to get their hands dirty and learn on the way or by example. For me personally, that works best when I’ve got an actual problem or challenge that needs to be solved in order to learn a technique. In this article, I’d like to share some Akka stream techniques and gotcha’s that I’ve learned on the way while building a real-life use-case.

This article will require a basic understanding of Reactive Streams and Akka Streams in particular.

The use case

What I essentially had to build was a file aggregation stream, which would read multiple (optional and/or required) files from a directory, and combine the result into a case class for further processing. In my case, that further processing meant streaming the data to ElasticSearch. For brevity sake, I’ll leave that part out.

Imagine that you have a directory called documents. This directory contains N directories that each represent an identifier of a document. Within each of these directories, you could have up to 2 different files present. Each one of those 2 files is the output result of a completely separate process. Each of those processes may not have run yet for a specific directory, so we need to consider that the files that I’m looking for may not always be there.

The following directory structure and files could be a possibility that our stream needs to be able to handle:

documents/
1/
metadata.json
text.json
2/
text.json
3/
metadata.json
4/

As for the requirements for the stream:

  1. We want to list all directories within the base directory documents/
  2. Per directory, we want to retrieve all enclosed files
  3. For each known file, we want to extract the JSON content. The stream should fail if the process is unable to parse the contents as the expected JSON
  4. We want to ignore unknown files (any other filename than the 3 mentioned above)
  5. We only want to emit the final case class Document that contains all data if both files are present for a specific directory, and I’m able to construct the case class from the content of both of these files

Listing directories and reading files

To list directories and read files contained in each directory in a streaming fashion, there’s an Alpakka dependency available. If you’re not familiar with Alpakka, it’s an integration library built on top of Akka Streams to provide connectors to a lot of different systems. For interacting with the filesystem, add the following dependency to your build.sbt:

"com.lightbend.akka" %% "akka-stream-alpakka-file" % "2.0.0"

With the dependency set up, listing all directories in a path, and then reading all files within each directory found is quite simple:

We perform the ls command on the base path, which returns a Source[Path, NotUsed] emitting an element for each directory found within the base path. We then use another Directory.ls within a flatMapConcat. The use of flatMapConcat is important to note here. As mentioned before, Directory.ls returns a Source[Path]. If we were to simply use a map(Directory.ls) the resulting type would be a Source[Source[Path, NotUsed, NotUsed]. flatMapConcat essentially flattens the sequence of Source[Path, NotUsed] that each Directory.ls will return.

The end result of these 3 simple lines is that we have a single Source that emits an element for every single file found within any directory found within the base path.

Reading file content

Akka Streams allows you to easily modularise your stream into composable and reusable components. Not only will the cleanliness of your code benefit from this, but it allows you to easily compose streams of different ‘capabilities’ while allowing you to unit- and integration test every single part. You can find more information about this in the official documentation.

For the stream discussed in this article, I’ll use the capabilities of defining composable components quite a lot. To start with, I’ll create a flow which takes a Path and returns a String representing the contents of the file:

In this flow, we use FileIO.fromPath to stream the contents of the file. The return type of this function is Source[ByteString, Future[IOResult]]. It’s important to understand here is that FileIO.fromPath emits chunks of the file. Again, using flatMapConcat in combination with reducing the stream will make sure that the resulting stream is a Source[String] in which each item is a file’s entire content.

Processing files based on filename

We now have the Source[Path] that’s emitting all the individual files, and a flow to read the contents. To decode the file contents to Json and to construct the case class that our stream will eventually emit, we need to be able to separate processing based on the filename.

Let’s create another Flow that takes a Path and emits a Document. This time, however, we will use Akka’s Graph DSL to construct a more complex flow. For each of the files that we can process, and for the end result, we’ll define types:

sealed trait ProcessableFile
case class Metadata(id: String) extends ProcessableFile
case class Text(text: String) extends ProcessableFile
case class Document(metadata: Metadata, text: Text)

For the flow itself, we’ll use a Partitioner to separate the Source[Path] into 3 separate flows. A Partitioner is one of Akka streams’ fan-out operators. It has 1 input and multiple outputs. One for the Metadata file, one for the Text file and one for unknown files. Each flow will then:

  1. read the file contents
  2. try to parse it to JSON and decode it to one of our ProcessableFile case classes
  3. emit the result to a Merge stage

The Merge stage is one of Akka streams’ fan-in operators. It has multiple inputs and one output. It essentially collects all the items that any of the inputs (3 in our case) will emit in the order that they’re emitted.

Before we move on, let’s see that in code:

There are a few important things to mention here. First of all, when using the GraphDSL, you’re able to build your own graph using a builder to which you add predefined stages (like Partition or Merge) or your own.

When constructing the Partition and Merge we specify the types. The type of Merge is set to Option[ProcessableFile]. The ProcessableFile is the trait that any processable file that we know of extends from. The reason for making it an Option is that we also need to be able to merge in skipped files that result in a None type.

You’ve probably been asking yourself what the partitionFiles reference is that is passed to the Partition. It’s actually the function that contains the logic to determine to which outlet the file will be sent:

To keep this article related to Akka streams I’ll omit the implementation of the decode method that’s used to transform the file content to one of our ProcessableFile types.

The end result of what we’ve covered so far is a stream of Option[ProcessableFile]. All that’s left to do is convert that to our Document type and we’re done, right?

If we think about it logically, we want to collect the results and combine them into instances of Document. If you’re familiar with functional programming, this is exactly what a fold would do. You’d be able to define an accumulator that keeps updating while items flow through. However, a fold in Akka streams only emits a record if the downstream source is completed. Since our stream is theoretically endless, this moment will never happen, and you’re left with a stream that just takes input and never outputs a single Document.

scan to the rescue! scan is a fold that emits every update it received. This would mean that every update to our accumulator would be emitted as a new item, which we would still need to filter to only gather complete Document instances.

Let’s first take a look at our accumulator and Document instances and any helper methods that we’ll need:

The accumulator is essentially a duplicate of Document except that it has optional types. We also need a isComplete method that indicates whether the accumulator has all the data that we need to be regarded as complete. Last but not least, we add an apply instance to construct a Document from the DocumentAccumulator.

Building upon the graph that we have so far, we can add the logic described above:

We take the output of our Merge operator, merge.out, and add the scan operator to it. The results of scan are filtered using our isComplete method and then complete accumulators are mapped to our Document type.

Last but not least, for the graph to function, we need to define the FlowShape. This essentially describes the 1 input and 1 output that this graph has. For us, that means inputting into the Partition and the output will be the stream of Document instances created by the documentOutlet.outlet.

Hooking this graph up to our stream and running it with Sink.seq will give us the sequence of documents that we’re looking for:

This may look like we’re finally done, but unfortunately, there’s an important aspect we overlooked. A Source[Path] that emits all files in all directories found in our base directory will be connected to this flow. What happens if a specific directory does not have all the required files to construct a complete accumulator? You probably guessed it, but the accumulator will keep its current state and move on to processing the first file from a next directory. We made our stream completely unaware of the relationship between the files and its parent directory.

Substreams to the rescue

A Substream is essentially a single stream that’s multiplexed into a stream of streams. By using Akka’s groupBy operator, you’re able to determine a key for each element passing through. That key determines to which substream an element goes.

If we change our implementation to first use groupBy based on the directory name, and only then use our Partitionand Merge logic to process files and convert them into a Document, we know for sure that only files from a specific directory will flow through that logic.

Let’s take a look at what this would look like:

We’ve rewritten the implementation of constructDocumentFromPaths to still be a Flow[Path, Document, NotUsed]. However, we’ve moved the Partition and Merge logic to a function called mergeFilesIntoADocument. The flow above now also takes a ProcessingConfig parameter which is a simple case class containing configuration properties like maxSubstreams and timeout. By supplying these configuration properties as an argument, you’ll easily be able to unit test the logic of this flow and also more easily change the behaviour per environment.

The first thing that this function now does to the incoming items is using groupBy to create a substream per directory name. We then pipe that output through our mergeFilesIntoADocument. By default, Akka Stream substreams will never be automatically closed. Depending on the number of keys, that could mean a lot of substreams that will remain in memory. Akka allows you to define a timeout on your substream, in the form of an idleTimeout or completionTimeout. The first closes the substream when no items have come in after a specific amount of time. The latter closes the substream a specific amount of time after materializing the stream.

One important thing to know here is that the stream will not be completed. Instead, a TimeoutException is thrown. That’s exactly why we have a recoverWithRetries operator attached to it. That way, we can safely recover from the TimeoutException and make sure that our entire stream doesn’t fail because of a substream that gets closed. In case of receiving a TimeoutException, we simply return a Source.empty to keep everything going.

Another exception that you may come across when using substreams is the TooManySubstreamsException. The maxSubstreams property that we provide to the groupBy operator determines how many substreams there can be at one time. What is the right number to use? It’s hard to say without understanding your use case. How many keys are there? What is your timeout duration? If you except to only briefly require a substream, like in our case, where we at most have 3 files per substream, it makes sense to use a low timeout value, which will drastically reduce the number of maxSubstreams.

Last but not least, we need to merge all of our substreams back into the main stream. This is done by calling mergeSubstreams.

Conclusion

Even though the use-case seemed quite trivial, processing files and combining the results into a case class, the end result involves quite a few different techniques found in Akka’s reactive streams implementation.

Looking at our list of requirements, our implementation did tick all the boxes. In all honesty, we did skip over the third, in which the stream fails if decoding fails. Just make sure that your decode throws an error if something goes wrong with converting the file contents to JSON and then to the case class and you’re good to go.

There’s a lot left to optimise in what we’ve built so far. As it is now it’s definitely not scalable to multiple instances since all of them would be processing the entire source directory. As with all software, there’s a balance to strike between it being good enough and over-engineering.

What we’ve covered showcases quite a few different techniques found within Akka Streams and shows how versatile the technology really is.

--

--