Declarative Pipelines in Kotlin

Attempt to demonstrate usefulness of generators in kotlin..

If you have done any kind of web development, you must have seen how the access logs on the server looks like. If not below is the snapshot of access-log file.

Each in in the file has the below format
Ip address : date: time: HTTP method : End point : Response : Bytes Transferred.

Now lets say that the task given is to find total number of bytes transferred during all the hits to all end points. We must remember the fact that these access-log files can be really large so reading an entire file in memory to read the bytes and then aggregating them to get the total count can put too much of burden on Ram.

Now lets see how we can go about building a pipeline which is memory efficient during the processing of the file.
I would highly recommend this episode of fragmented podcast 
https://fragmentedpodcast.com/tag/sequence/ to get to know more about sequences which form the foundation of the pipeline.
I am attaching Kaushik Gopal’s notes from the podcast episode which are essential to understand and he has done a fantastic job in explaining it.

Here are the important takeaways
1. Sequences are evaluated lazily 
2. Sequences are not useful without a terminating operation. Examples sum / min / max 
3. Intermediate operations on Sequences wont result in temporary lists being created [ This is where the memory saving comes ]

With these two important takeaways pipeline is just chaining of two or more sequences.
To be more specific we build a sequence from first function and feed it as an input to the next function resulting into an efficient data processing pipeline.

Lets look at individual functions forming a stage in pipeline.
getLines() : This function doesn’t read entire file but just returns a sequence using file.bufferedReader().lineSequence() and we use yield(line) to feed that line to the next stage.

getBytesColumn() : This function reads a single line at a time from getLines() function and gets only the last column [ Remember this is sequence and not an intermediate list. so its cheap on memory ].

getBytes(): If you look at the screenshot below, some log lines don’t have bytes at the last column. look at last three lines. We skip those lines while getting bytes. [ Remember this is sequence and not an intermediate list. so its cheap on memory ].

Now since we are at the last stage of pipeline which has sequence of bytes , all that is remaining is we compute the sum. ie the terminating operation. 
#2 point in our important takeaways while working with sequences.
You will be surprised but this is the operation that drivers even the first line that is read from access-log , till this operation is invoked , not a single line is read from the log file.
Basically the entire pipeline is consumer driven.

No matter how big the access-log file is , you now have a way to deal with it in memory efficient way.
Now i could have used fluent api to do all of these in chain operation but declarative way like this communicates the purpose about what we are trying to achieve at any stage and it becomes easier to reason about and debug especially if there are too many stages involved in the pipeline.

I hope you enjoyed the article. Don’t forget to clap if you have :-)
I hope to explore further on sequences and generators in Kotlin and i will share as soon as i have something useful.

Please checkout my other articles and see if you like them. By the way one of them appeared in Kotlin Weekly news letter.