The hidden power of node.js stream: reactive programming.
This post is about a little coding exercise where we will create basic components to implement reactive programming paradigm using only node.js streams. There are lot of libraries out there for reactive programming, like RxJS, Bacon and Callbags. These libraries don’t use node.js streams while is possible to wrap regular streams as source. Another good library is Highland which build node.js-like streams. More than explaining how to use this kind of libraries, in this article we want to write a simple one, understand which problems it solves and which advantages gives us.
Starting from a real world problem, first we’ll build some basic tools which will remember us the basic blocks of functional programming, such as map
, filter
and reduce
functions. Then we'll see how to combine these bricks to write a neat, clear and effective code.
You can find all the code present in this article on repl.it.
The case
Let’s define the problem we want to solve. We want to fetch, from a database, a list of economical transactions. Later we want to convert each transaction amount from US-dollar to euro, filter only the ones with an amount higher than 100€ and compute the sum.
Don’t mind if those operations can probably be done with most of the database out there, for the sake of the experiment let’s say we can just retrieve a list as an array or a stream.
Here a simple implementation
The code should be clear enough. First we retrive the data as an array, then we map each value to convert the amount from dollars to euro, then we keep only those which amount is more than 100€ with the filter function and lastly we reduce the array to get the sum of the amounts. We return this sum.
What’s wrong with it
Let’s look at some possible problems of this approach. Everything stays in memory: the array returned by the db is stored in memory, the memory of your application. If there are millions of those transactions, the memory footprint can be huge.
map
, filter
and reduce
are handy functions but each of it cycle our dataset one time. This means the dataset is cycled three times in this example (reduce cycles a subset actually). Cycling without a bike is just loosing time. To solve this problem could be useful to know that we could use a nice different approach with transducers
. Transducers are really powerful and worth reading about it. This article by @roman01 gives a great introduction.
We want to change this implementation to use streams to demonstrate how these problems are solved.
Streams
In this article I take as a fact that you already know node.js streams. I already know that streams is one of the most underestimated parts of node.js so here a little recap:
Streams can be of three types:
- Readable
- Writable
- Duplex/Transform
A Transform
stream is a Duplex stream that can transform data it receives.
null
has a particular meaning among streams. If the null value is fed to a stream, the stream ends. It's like saying that null
has the meaning of "end of communications".
Streams usually work with buffer but they can be configured to work with objects. Practically this means that every chunk read/write by a stream is an object. For simplicity we’ll work in object mode.
Streams can be piped to produce chains that looks like Readable → Transform → Transform → Writable
. It's written like this:
For anything else you don’t know about streams, read the official documentation or the article Node.js Streams: Everything you need to know.
Our first Transform stream
Let’s imagine to have function called DB.getAsStream
which gives the same results of DB.getAsArray
but in the form of an object stream.
Let write a transform stream that changes each transaction returning the amount converted in euro. We will use the simplified constructor for stream, but the same can be achieved with the class-style constructor.
Let’s write the entire code
As you can imagine, it’s possible to write also the filter and reduce functions as transform, but is this needed? Must we really write each business function as a Transform implementation? What if we can focus on our business code and hide somewhere else the Transform instantiation? Well, we can…
Providing the building blocks
In our first implementation we focused on our business without caring of how map, filter, reduce
functions works. We have them as part of the standard javascript library and we simply use them. We focused on writing the mapping function, the filter function and the reduce function.
Let's isolate them:
Now we’re going to write the implementation of map, filter and reduce
but for streams so we can reuse our business logic. At the end we'll write our code like that
Map
Here an implementation of map using a transform stream. We want to let the user provide a map function and apply it for each chunk.
This should do the trick. Every chunk passing in the stream is mapped to a new value using the user mapping function fn
, exactly as map for arrays. The second parameter is included in this implementation for completeness, but it's useless in our case.
Great. Now we can write
Filter
Filter exploits a peculiarity of transform streams: they’re not forced to emit an output value for each input value. They’re totally unrelated, output values can be as much as the input or less or more. Here the filter for streams
Notice that if the filter function returns false, we call the callback with undefined. We do not use null otherwise the stream will end!
Reduce
Reduce is probably the more complex but not that much.
As you can see reduce must implement also flush
method. This method is called at the end, when all the transformations are done (on stream "end" event). At that point we want to return the accumulated value.
We should note a key difference between reduce
and the other functions. While map and filter emit a value while the stream is ongoing, reduce will emit only at the end. This is because, with streams, we can't know beforehand how many chunks there will be, we must wait the stream end before emitting the reduced value.
Using the building blocks
A you can see, map, filter and reduce have the same signature of their array counterpart except for the last parameter which is ignored by the way. Let’s see how our code looks like using the new functions
So, we focused on our business code. Cool. We also solved all our initial problems. The memory is under control since the streams never bloat your memory which will keep only the current chunk. Also, we never cycle the dataset more than once. Each chunk pass through map and filter function once and then goes on the reduce function which will access the chunk one by one too. It’s very similar to what transducers do.
This is a great achievement because, without changing too much the way we wrote the code, we gain a lot and are covered against problems that can arise only at runtime.
Bonus 1: compose
The code is neat, quick and have a low memomry footprint but we can have a better style. Having each map/filter/reduce function wrapped by a pipe
function makes the code a bit ugly. We have two choices: make the code chainable (.map().filter().reduce()
) as it happens for arrays, or use a point free code style.
To make the code chainable we could wrap the stream object and this is what a library like Highland does. Since that is already a good example, let’s go for the point-free version.
Point-free code style is a popular way of writing the code especially in functional programming world. What we want is to group all the transoform passed to pipes in order to remove all those .pipe()
call: we want to compose them. The desired result is:
As you can see this is very similar to compose
for functions. The only difference is that it takes a stream and returns a stream. Here the implementation of compose.
It’s a simple reduce that start with a stream and, at each step, returns the stream piped with the next transformer.
Bonus 2: async
Map, filter and reduce accept only synchronous functions. Stream transformers are innerly asynchronous because, as many node.js functions, a callback is accepted as last parameter. Here a version of map that accept a promise as return value of the map function and waits until the promise is fullfilled.
This implementation accepts also functions that don’t return promises and promisify their return value.
Bonus, some more
There are other little advantages, e.g.
- easy by-step inspection
- incomplete map
- error handling
- Multiple flows (stream fork)
- stream as promises
but maybe these will find place in another article :)
Conclusion
We saw how we can solve some common problems with a nice use of node.js streams. We wrote together some basic block, the functions map
, filter
and reduce
, to build everything else with it. Usually libraries that offer similar functionalities have useful functions such as throttle, debounce, group etc. We definied only three basic functions because them can be used to build any other fancy functionality. To be honest, we could have defined only reduce, because it's the very base component of any other function. Map and filter can be actually written as reduce implementation. If you like the approach I suggest you to have a look at the libraries I mentioned at the beginning of this article because usually offers many ineresting features (e.g. lazyness and asynchronicity).
Originally published at www.ramielcreations.com on July 8, 2018.