Streams for Everyone
Last week, I had an idea of creating a supplemental CLI interface for us to leverage RabbitMQ and BunnyBus without needing to bootstrap an application. My original thought was a quick NodeJs project to profit on some
pipe() chaining. It was not until I ventured into the guts of the situation that I realized the NodeJs Stream is an unwieldily beast needing to be tamed with proper documentation and diagrams.
What is a Stream?
Quoted from Wikipedia
a stream is a sequence of data elements made available over time
A stream provides an efficient way of handling a big set of data. Streams are important concepts in most classical languages. We tend to use streams when the size of the data is unknown or varying. The reason is anytime data is handled as a whole in an application, it has to be loaded into memory. So if you have a piece of data that is overly large, it can cause unwanted memory pressure and out of memory exceptions. Streams allow you to handle only what is at hand.
Let’s imagine we have a web server handling HTTP request without streams. We want to write all of these request to the file system. Since we are not using streams, we would load all of the data from the request into memory, and then write that entire body of data into the file system. This would require we have enough memory to accommodate the entire request. This will cause undulation with our memory profile because it is necessary to allocate memory for the request. Then afterwards, we would need to clean up the memory space once we are done.
Now, using streams, we would get a stream handler (object pointer) passed to us from our socket listener or web middleware. With a handle on the stream, we can redirect the output to the file system. The memory footprint for the stream would only be as big as the largest segment of data in transit at that point in time. So instead of needing 1 MB of memory to handle the whole request, you might only need 8 KB or as large as your stream buffer will allow.
What are NodeJs Streams?
Streams in NodeJs embodies the core concepts from above. There are 3 main components essential for handling streams.
Readable— Producer of stream data.
Writable— Consumer of stream data.
Transform— Used when manipulation of the stream data needs to occur.
We would normally connect the components in this way.
Interestedly, streams in NodeJs has allowance for multiple data types. Classical streams normally has strict adherence for providing transport of only byte arrays. But in NodeJs, we can also specify UTF8 strings and objects. Yes, you heard me right, objects!
The flexibility around various type handling is useful. We can build an input pipe that consumes raw byte data which is the usual format for things serialized with binary formatting. Then attach transforms to the pipeline to allow us to convert the raw bytes to string types. Which then can allow us to interpret further and transform those strings to objects. This basically allow us to break down the work of transformation or stream processing over several transforms which is great for dividing the concentration of work.
An example of this can be represented when we need to read data in from a file stream. Followed by conversion of that stream data to valid UTF8 strings. And then interpreting the strings into JSON blobs. Where each of those JSON blob would be deserialized into an object. And finally, written into Elasticsearch as a destination source. Once again, streams allow us to only process what is necessary in order to interpret for the next step.
A side note, the NodeJs stream interface has evolved from where it first started. So you have to put in special care when looking at documentation and module interfaces to understand the full picture.
Disconnects With NodeJs Stream Documentation
I’m coining this statement for a future meme.
Everyone uses streams, but no one writes the sinks for them
As I was going through Google finding reputable examples and write-ups for how to manage states of a stream, I found an abundant coverage on how to use them. But in the area of how to create stream components for
Duplex, it was close to none existent. Here are the handful of useful resources which helped me.
- Stream Handbook
- Backpressuring in Streams
- Readable, Writable and Transform Streams in Node.js
- NodeJs Stream API Guide
From all of the web post I found, they mostly parroted the NodeJs Stream API Guide. It would not be a problem if the NodeJs documentation is accurate, but there are more than a few disconnects between the definition for the end caller and the definition for component implementations.
An example of when does
end() get invoked for a
Writable component. It is obvious to me that it is called and should be invoked within the
pipe() chain when the
Readable component has no more data. But for the implementation example, there is zero coverage of this consumer interface. This really left me hanging as to how a
Writable component detects the end of stream so that we can clean up underlying system resource and not hang the application.
Another example is the confusion around each event emitted by the components. Let’s take
Writable as an example again and drive into the
close event. The document states this event is emitted when the underlying resource is done cleaning up. But does this mean you as the implementor of the component and the consumer of said resource are to emit the event manually. Or is there an underlying function to call which will take care of the eventing? Once again, the implementation example has this missing.
All good API specifications have a few things in common. The one most relevant is expected and predictable behavior in design. When implementing stream components, the interaction with stream events (
finish and etc…) are unclear for which ones are emitted by the underlying super class or in need of self invocation through
emit(). I want to impress upon you, our future API providers; Please do not fall into similar design flaws. It is a horrible experience for developers implementing against any spec.
Finding How The Spice (Stream) Flows
When I am befuddled, I look towards building experiments to help untangle the web of information. Here is one such example allowing me to test the eventing from the
This experiment allowed me to see the drip of events down stream from one component to another while using
pipe(). It allowed me to connect the cause and effect. Here are a few questions I was able to answer.
- When does the
_read()get called once the pipe is connected?
- When is
_write()triggered when data is pumped from a
- Is the
drainevent managed by the
Writableinterface or something I need to
- Can I manipulate the
drainevent through control of
- Are the
closeevents managed by the
end()method within the
- Is there a corresponding private
_end()function to implement for the consumer facing
- How much data is needed to force
_writev()to be called?
It took me several different experimental setups to answer what I needed. But I urge you to try this to verify behaviors. This is a great outlet for bypassing unfinished documentation or can be used to verify the documentation itself is correct.
How It Flows
I was inspired to create a diagram based on the ASCII art I saw from one of the back pressure article. I added details where lacking and also tried to make event causation more clear.
It is important to note
pipe contains no magic and only operates within normal means when both the
Writable interfaces are triggered in a timely fashion. For example, if the
_read() method is called, but the corresponding
this.push() is not invoked, then the piping mechanism will just hang. This is also true with the
_write() method and the associated
callback() needing to be reconciled.
There are events such as
close which are not emitted within the
Writable super class. These events are optional for you to emit yourself. I’ve had to use them while closing underlying resources like sockets or other file descriptors. It is very useful to leverage these while unit testing to get a truth on when either end of the pipe is truly no longer able to act.
_write() invocations are never guaranteed any data when they are called. Expect none by default. When the
pipe() starts up, and calls on
_read(), the underlying data source for the
Readable may not be initialized yet. This problem exist because some data source may require instantiation of database connections while
_read() is invoked. So for some implementation to support delayed initialization, it is necessary to either pass empty data or keep a semaphore to count how many
_read() calls has been made so equivalent quantity of
this.push() can be invoked.
end() interface in
Writable is an optional implementation. There is no private method to implement for this so you would just override the public interface if needed. The call made to
end() is never guaranteed to be the last call within the stream of events. Others and I have seen this execute out of order.
Writable components which includes
Duplex utilize the
highWaterMark option to throttle the
Readable component. This is a built in feature and does not need to be manipulate. It is easy to trigger the behavior of throttling when you set the
highWaterMark to be of smaller value than the size of data being passed. You should also be able to trigger throttling behavior by returning
false within the
About the Author
Lam is a Software Architect for the Locals Squads @ XO Group. He is a seasoned polyglot engineer with over 16 years of professional
experience working with startups and multiple fortune 500 companies. When he is away from the office, he enjoys contributing to OSS
projects and dabbles with wood working projects. Find out more about Lam on LinkedIn