Akka Streams by example: processing files
There are different techniques to learn a new craft or skill. Some people prefer reading, others prefer to watch videos, and there are those, like me, that just need to get their hands dirty and learn on the way or by example. For me personally, that works best when I’ve got an actual problem or challenge that needs to be solved in order to learn a technique. In this article, I’d like to share some Akka stream techniques and gotcha’s that I’ve learned on the way while building a real-life use-case.
This article will require a basic understanding of Reactive Streams and Akka Streams in particular.
The use case
What I essentially had to build was a file aggregation stream, which would read multiple (optional and/or required) files from a directory, and combine the result into a case class for further processing. In my case, that further processing meant streaming the data to ElasticSearch. For brevity sake, I’ll leave that part out.
Imagine that you have a directory called documents
. This directory contains N directories that each represent an identifier of a document. Within each of these directories, you could have up to 2 different files present. Each one of those 2 files is the output result of a completely separate process. Each of those processes may not have run yet for a specific directory, so we need to consider that the files that I’m looking for may not always be there.
The following directory structure and files could be a possibility that our stream needs to be able to handle:
documents/
1/
metadata.json
text.json
2/
text.json
3/
metadata.json
4/
As for the requirements for the stream:
- We want to list all directories within the base directory
documents/
- Per directory, we want to retrieve all enclosed files
- For each known file, we want to extract the JSON content. The stream should fail if the process is unable to parse the contents as the expected JSON
- We want to ignore unknown files (any other filename than the 3 mentioned above)
- We only want to emit the final case class
Document
that contains all data if both files are present for a specific directory, and I’m able to construct the case class from the content of both of these files
Listing directories and reading files
To list directories and read files contained in each directory in a streaming fashion, there’s an Alpakka dependency available. If you’re not familiar with Alpakka, it’s an integration library built on top of Akka Streams to provide connectors to a lot of different systems. For interacting with the filesystem, add the following dependency to your build.sbt
:
"com.lightbend.akka" %% "akka-stream-alpakka-file" % "2.0.0"
With the dependency set up, listing all directories in a path, and then reading all files within each directory found is quite simple:
We perform the ls
command on the base path, which returns a Source[Path, NotUsed]
emitting an element for each directory found within the base path. We then use another Directory.ls
within a flatMapConcat
. The use of flatMapConcat
is important to note here. As mentioned before, Directory.ls
returns a Source[Path]
. If we were to simply use a map(Directory.ls)
the resulting type would be a Source[Source[Path, NotUsed, NotUsed]
. flatMapConcat
essentially flattens the sequence of Source[Path, NotUsed]
that each Directory.ls
will return.
The end result of these 3 simple lines is that we have a single Source
that emits an element for every single file found within any directory found within the base path.
Reading file content
Akka Streams allows you to easily modularise your stream into composable and reusable components. Not only will the cleanliness of your code benefit from this, but it allows you to easily compose streams of different ‘capabilities’ while allowing you to unit- and integration test every single part. You can find more information about this in the official documentation.
For the stream discussed in this article, I’ll use the capabilities of defining composable components quite a lot. To start with, I’ll create a flow which takes a Path
and returns a String
representing the contents of the file:
In this flow, we use FileIO.fromPath
to stream the contents of the file. The return type of this function is Source[ByteString, Future[IOResult]]
. It’s important to understand here is that FileIO.fromPath
emits chunks of the file. Again, using flatMapConcat
in combination with reducing the stream will make sure that the resulting stream is a Source[String]
in which each item is a file’s entire content.
Processing files based on filename
We now have the Source[Path]
that’s emitting all the individual files, and a flow to read the contents. To decode the file contents to Json
and to construct the case class that our stream will eventually emit, we need to be able to separate processing based on the filename.
Let’s create another Flow
that takes a Path
and emits a Document
. This time, however, we will use Akka’s Graph DSL to construct a more complex flow. For each of the files that we can process, and for the end result, we’ll define types:
sealed trait ProcessableFile
case class Metadata(id: String) extends ProcessableFile
case class Text(text: String) extends ProcessableFilecase class Document(metadata: Metadata, text: Text)
For the flow itself, we’ll use a Partitioner
to separate the Source[Path]
into 3 separate flows. A Partitioner
is one of Akka streams’ fan-out operators. It has 1 input and multiple outputs. One for the Metadata
file, one for the Text
file and one for unknown files. Each flow will then:
- read the file contents
- try to parse it to JSON and decode it to one of our
ProcessableFile
case classes - emit the result to a
Merge
stage
The Merge
stage is one of Akka streams’ fan-in operators. It has multiple inputs and one output. It essentially collects all the items that any of the inputs (3 in our case) will emit in the order that they’re emitted.
Before we move on, let’s see that in code:
There are a few important things to mention here. First of all, when using the GraphDSL
, you’re able to build your own graph using a builder to which you add predefined stages (like Partition
or Merge
) or your own.
When constructing the Partition
and Merge
we specify the types. The type of Merge
is set to Option[ProcessableFile]
. The ProcessableFile
is the trait that any processable file that we know of extends from. The reason for making it an Option
is that we also need to be able to merge in skipped files that result in a None
type.
You’ve probably been asking yourself what the partitionFiles
reference is that is passed to the Partition
. It’s actually the function that contains the logic to determine to which outlet the file will be sent:
To keep this article related to Akka streams I’ll omit the implementation of the decode
method that’s used to transform the file content to one of our ProcessableFile
types.
The end result of what we’ve covered so far is a stream of Option[ProcessableFile]
. All that’s left to do is convert that to our Document
type and we’re done, right?
If we think about it logically, we want to collect the results and combine them into instances of Document
. If you’re familiar with functional programming, this is exactly what a fold
would do. You’d be able to define an accumulator that keeps updating while items flow through. However, a fold
in Akka streams only emits a record if the downstream source is completed. Since our stream is theoretically endless, this moment will never happen, and you’re left with a stream that just takes input and never outputs a single Document
.
scan
to the rescue! scan
is a fold
that emits every update it received. This would mean that every update to our accumulator would be emitted as a new item, which we would still need to filter to only gather complete Document
instances.
Let’s first take a look at our accumulator and Document
instances and any helper methods that we’ll need:
The accumulator is essentially a duplicate of Document
except that it has optional types. We also need a isComplete
method that indicates whether the accumulator has all the data that we need to be regarded as complete. Last but not least, we add an apply
instance to construct a Document
from the DocumentAccumulator
.
Building upon the graph that we have so far, we can add the logic described above:
We take the output of our Merge
operator, merge.out
, and add the scan
operator to it. The results of scan
are filtered using our isComplete
method and then complete accumulators are mapped to our Document
type.
Last but not least, for the graph to function, we need to define the FlowShape
. This essentially describes the 1 input and 1 output that this graph has. For us, that means inputting into the Partition
and the output will be the stream of Document
instances created by the documentOutlet.outlet
.
Hooking this graph up to our stream and running it with Sink.seq
will give us the sequence of documents that we’re looking for:
This may look like we’re finally done, but unfortunately, there’s an important aspect we overlooked. A Source[Path]
that emits all files in all directories found in our base directory will be connected to this flow. What happens if a specific directory does not have all the required files to construct a complete accumulator? You probably guessed it, but the accumulator will keep its current state and move on to processing the first file from a next directory. We made our stream completely unaware of the relationship between the files and its parent directory.
Substreams to the rescue
A Substream is essentially a single stream that’s multiplexed into a stream of streams. By using Akka’s groupBy
operator, you’re able to determine a key for each element passing through. That key determines to which substream an element goes.
If we change our implementation to first use groupBy
based on the directory name, and only then use our Partition
and Merge
logic to process files and convert them into a Document
, we know for sure that only files from a specific directory will flow through that logic.
Let’s take a look at what this would look like:
We’ve rewritten the implementation of constructDocumentFromPaths
to still be a Flow[Path, Document, NotUsed]
. However, we’ve moved the Partition
and Merge
logic to a function called mergeFilesIntoADocument
. The flow above now also takes a ProcessingConfig
parameter which is a simple case class containing configuration properties like maxSubstreams
and timeout
. By supplying these configuration properties as an argument, you’ll easily be able to unit test the logic of this flow and also more easily change the behaviour per environment.
The first thing that this function now does to the incoming items is using groupBy
to create a substream per directory name. We then pipe that output through our mergeFilesIntoADocument
. By default, Akka Stream substreams will never be automatically closed. Depending on the number of keys, that could mean a lot of substreams that will remain in memory. Akka allows you to define a timeout on your substream, in the form of an idleTimeout
or completionTimeout
. The first closes the substream when no items have come in after a specific amount of time. The latter closes the substream a specific amount of time after materializing the stream.
One important thing to know here is that the stream will not be completed. Instead, a TimeoutException
is thrown. That’s exactly why we have a recoverWithRetries
operator attached to it. That way, we can safely recover from the TimeoutException
and make sure that our entire stream doesn’t fail because of a substream that gets closed. In case of receiving a TimeoutException
, we simply return a Source.empty
to keep everything going.
Another exception that you may come across when using substreams is the TooManySubstreamsException
. The maxSubstreams
property that we provide to the groupBy
operator determines how many substreams there can be at one time. What is the right number to use? It’s hard to say without understanding your use case. How many keys are there? What is your timeout duration? If you except to only briefly require a substream, like in our case, where we at most have 3 files per substream, it makes sense to use a low timeout value, which will drastically reduce the number of maxSubstreams
.
Last but not least, we need to merge all of our substreams back into the main stream. This is done by calling mergeSubstreams
.
Conclusion
Even though the use-case seemed quite trivial, processing files and combining the results into a case class, the end result involves quite a few different techniques found in Akka’s reactive streams implementation.
Looking at our list of requirements, our implementation did tick all the boxes. In all honesty, we did skip over the third, in which the stream fails if decoding fails. Just make sure that your decode
throws an error if something goes wrong with converting the file contents to JSON and then to the case class and you’re good to go.
There’s a lot left to optimise in what we’ve built so far. As it is now it’s definitely not scalable to multiple instances since all of them would be processing the entire source directory. As with all software, there’s a balance to strike between it being good enough and over-engineering.
What we’ve covered showcases quite a few different techniques found within Akka Streams and shows how versatile the technology really is.