Streams In Depth In Node.js

Sitvanit Meltzer
Autodesk TLV
Published in
11 min readJun 26, 2019

Introduction

Streams are one of the fundamental concepts in programming in general, and in Node.js in particular. It is likely that you are using it without knowing that you are using it (but you’ll discover it after you’ll finish reading that post).

If you would like to read about the way we are using it in our service, you are invited to read my blog post: https://medium.com/@sitvanitm/how-can-you-increase-i-o-performance-with-streams-cc85704db108

Streams are objects that let you move data from a source to its destination, without waiting for the entire data to be ready for its journey, because it will be sent asynchronously by chunks.

Hence, using streams will improve your time and space efficiency significantly, because if you are using chunks, you don’t have to wait until the whole file will be read before you write it.

The chunks wait before they are consumed in the waiting area, that called buffer, and placed in the RAM.

Every stream has its own internal buffer. The amount of the data in each buffer depends in its size (which we can define in its constructor or use the default).

Stream Object Illustration

In Node.js we have ‘stream’ core module, and according to its documentation:

“A stream is an abstract interface for working with streaming data in Node.js. The stream module provides a base API that makes it easy to build objects that implement the stream interface… Streams can be readable, writable, or both… All streams are instances of EventEmitter.”

Abstract Interface

In order to use that API we should implement that interface for ours needs.

Node.js provides us many stream objects that have already implemented that interface, for common uses.

Usually developers do not use the ‘stream’ module directly, because there is no reason to implement a stream object that is already implemented. So, it’s likely that you’ll consume the ‘module’ indirectly (e.g. ‘fs’ implements the stream interface in createReadStream function).

You will use the stream module directly just if you have to implement a custom stream, for your own specific needs.

Stream Types — Readable, Writable, or Both

There are a few types of streams, readable, writable, or both.

Readable streams are an abstraction for a source from which data is consumed.

Writable streams are an abstraction for a destination to which data is written.

And there are also streams that can be both, that kind of stream is called duplex stream. Duplex streams are streams that implement both the Readable and Writable interfaces.

EventEmitter

Stream object can emit events and listen to events, and that is their way to communicate with each other. We’ll discover some of the important events later, in order to understand how streams work.

Before we’ll implement some custom streams by ourselves, let’s see how we can use the already existing stream objects.

I recommend to run the following code snippets. you can write it from scratch, or use my Github repository: https://github.com/sitvanit/snippets/tree/master/streams

Read and Write File

As we saw before, we can use the ‘fs’ module in order to read and write file with stream objects.

const readStream = fs.createReadStream(‘file’, { highWaterMark: 1024 });
const writeStream = fs.createWriteStream(‘file-copy’);
readStream.on(‘data’, chunk => {
writeStream.write(`\n\nnew chunk has been received: \n\n${chunk}`);
});

fs.createReadStream constructs a readable stream object, its first argument is the path of the file that we’d like to read, and its second argument is the options object that the constructor uses in order to create the stream object.

In that example, I decided to define highWaterMark option. We are going to understand what is the highWaterMark later. For now let’s say that it determines the size of the chunk in KB (It’s not necessarily that the highWaterMark is the size of chunk, it depends on the implementation of the stream object, but it is like that in ‘fs’). The default of the highWaterMark in ‘fs’ is 64KB.

In the next line I construct the writableStream with the path of the destination which I like to write to.

As we saw in the previous example we can connect the streams with pipe. But I decided to listen to the ‘data’ event (that event emits each time the stream is abandoned ownership of a chunk of data), in order to write the file with the title “new chunk has been received” to prove that the file has been consumed in chunks.

If you’ll run that script in your project, you’ll find that ‘file-copy’ has been created, and if you’ll open it, you’ll find our custom title before each 1024 chars (our chunk).

So now we can actually believe, we saw it with our eyes — streams read and write data in chunks — how amazing is that!

Receive Request and Send Response

Now, I’m going to shake your world. Did you know that request and response, those objects that you use on daily basis, are stream objects as well?

request is a readable stream and response is a writable stream.

const server = http.createServer((req, res) => {
const rs = fs.createReadStream('file', { highWaterMark: 1024 });
rs.on('data', chunk => {
res.write(`\n\nnew chunk has been received: \n\n${chunk}`);
});
});
const port = 3000
server.listen(port, 'localhost');
console.log(`server is listening on port ${port}`);

In that snippet I created a http server, inside of it I created a readable stream and the server listens to the ‘data’ event, as in the previous snippet. But this time, instead of writing the data to a file, we are going to send it by chunks on the response.

If you’ll run that server on your computer, and try to browse to ‘localhost:3000’ — you’ll find out that the response was sent in chunks.

I think now we are ready to go to the pool of the big ones. Let’s try to implement some custom streams on our own.

Custom Readable Stream

const { Readable } = require('stream');class MyReadable extends Readable {
constructor(data, options) {
super(options);
this.data = data;
}

_read(size) {
if (this.data.length) {
const chunk = this.data.slice(0, size);
this.data = this.data.slice(size, this.data.length);
this.push(chunk);
} else {
this.push(null); // 'end'
}
}
}
module.exports = MyReadable;

As I mentioned before, the Readable class is an interface that we must implement in order to use it.

And when we implement it, we have to implement the _read method. _read is a private method, that only the Readable class can use. We have to implement it, but we cannot use it. This method may get ‘size’ parameter, the size is determined by the highWaterMark that we sent in the stream constructor, or by the default (in stream module the default is 16 KB).

We can implement the _read, as the chunk will be in the ‘size’ that has been sent to the function (as I did above). But if we’ll ignore it, the data will be pushed to the buffer the moment it will arrive, and not wait for accumulating data in the size of the chunk and then send it.

After I created the chunk of data, I have to push that chunk to the stream. The push will fill the buffer with the data and will trigger the readable stream to emit a ‘readable’ event, in order to let its listeners know that there is data that they can read.

When we finish reading all of the data we need to push ‘null’ to the buffer, and then an ‘end’ event will be emitted, and the listeners will get it, and stop listening.

Now, we would like to consume that custom readable stream that we just implemented.

const data = [{ a: 1 }, { b: 2 }, { c: 3 }, { d: 4 }, { e: 5 }];
const readable = new MyReadable(data, { objectMode: true, highWaterMark: 2 });
console.log(readable.readableFlowing); // nullreadable.on('data', chunk => {
console.log(chunk);
});
console.log(readable.readableFlowing); // truereadable.on('readable', () => {
readable.read();
});
console.log(readable.readableFlowing); // falsereadable.on('end', () => console.log('No more data!'));

first, you should know, that stream can consume only data that is string, buffer or objects. in that example I chose objects. If we choose objects, we have to set objectMode to true. You can see that I limit the highWaterMark to 2. If you followed me, I mentioned before that the units of the highWaterMark property is KB — but not here, if we set the objectMode to true, then the units of the highWaterMark will be the number of the objects.

Readable stream has three states and two modes.

We can get the current state by calling the readableFlowing property of the stream.

All Readable streams begin in paused mode. At the beginning, in the init state, the readableFlowing will always be null, because no mechanism for consuming the stream’s data is provided. Then after it started to be consumed, the state will change to flowing mode or stay in paused mode .

In flowing mode, data is read from the underlying system automatically and provided to an application as quickly as possible using events via the EventEmitter interface.

The mode will be changed to flowing mode if one of the following will occur:

1. Attaching a listener for the ‘data’ event

2. Calling the readable.pipe() method

3. Calling the readable.resume()

In paused mode, the stream.read() method must be called explicitly to read chunks of data from the stream.

The mode will be changed to paused mode if one of those will occur:

1. Calling readable.pause()

2. Calling readable.unpipe()

3. Receiving backpressure (we are going to delve deeper into this later)

It’s important to remember that Readable will not generate data until a mechanism for consuming or ignoring the data is provided.

If a Readable is switched into flowing mode and it’s not consumed by anyone, the data will be lost.

Adding a ‘readable’ event handler automatically makes the stream stop flowing, and the data can be consumed with readable.read() function. And if the ‘readable’ event handler will be removed, the stream will change to flowing mode again if there will be a ‘data’ event handler.

Calling readable.read() flushes the buffer and emits ‘data’ event.

In the snippet above I chose listening both ‘readable’ event and ‘data’ event’, just in order to demonstrate the change of the flowingMode.

If you’ll consume a readable stream (and not with pipe), you should choose whether you want to consume the data in flowing mode and listen to ‘data’ event, or in paused mode, and listen to ‘readable’ event.

At the end, don’t forget to listen to ‘end’ event (that emits by readable.push(null), remember?).

Custom Writable Stream

const { Writable } = require('stream');class MyWritable extends Writable {
constructor(options) {
super(options);
}
_write(chunk, encoding, callback) {
const formattedChunk = this._writableState.objectMode === true ? JSON.stringify(chunk) : chunk;
console.log(`Writing to stream: ${formattedChunk}`);
callback(); // flushes the buffer and emits 'drain' event
}
}
module.exports = MyWritable;

Here again we should implement the specific stream, by extending Writable interface.

We must implement the _write method. _write gets three parameters:

1. Chunk — the chunk of data, the may be string, buffer or object (if we are in objectMode).

2. Encoding — If the data is string from a specific encoding

3. Callback — that will be called at the end of the method, will flush the buffer, and will emit ‘drain’ event.

_write is internal, and we cannot use it outside of the Writable class.

Now, If we’d like to consume that class:

const data = ['first element', 'second element', 'last element'];
const writable = new MyWritable({ highWaterMark: data[0].length });
const written = writable.write(data[0]);
writable.write(data[1]);
writable.end(data[2]);
console.log(written);

In that snippet the data is an array of strings, and when I created the instance of MyWritable, I set the highWaterMark to the length of the first string.

In our example the highWaterMark determines the size of the chunk and the buffer.

We consume the writable stream with writable.write(). That function returns a Boolean — true if there is a place for more data in the buffer, and false if not.

So now, what do you think, Is ‘written’ is going to be true or false? Because I set the size of the buffer to the length of the first word, ‘written’ is going to be false.
Does it mean that we are going to lose the rest of the data because the buffer is full?

Not really, you can try it and see that the data is still consuming even though the buffer is full.

What does it mean? It means that the buffer can exceed the limitations we set for it. And we have to implement a mechanism that will regulate the data flow. And if we won’t, we are taking a risk to block the RAM even though we used streams, and cause a backpressure.

Backpressure

A backpressure is a situation when the receiving end of the transfer has complex operations, or is slower for whatever reason, there is a tendency for data from the incoming source to accumulate.

In that case the pipe has to control the data flow.

Luckily, Node.js already implemented that pipe for us, and if we’ll use pipe function, we will receive that regulation mechanism out of the box, and we shouldn’t take care of it.

Pipe

Unless you have a really good reason, always use pipe in order to connect streams.

const data = [{ a: 1 }, { b: 2 }, { c: 3 }, { d: 4 }, { e: 5 }];const readable = new MyReadable(data, { objectMode:true, highWaterMark: 2 });
const writable = new MyWritable({ objectMode:true });
readable.pipe(writable);

Now we are in the safe zone!

Duplex Stream

We have implemented readable and writable streams, but we mentioned at the beginning that there are streams that are both readable and writable — duplex stream.

We can simulate it to a phone, you can talk to your friend, and listen to him, and in the same time he can talk to you and listen to you.

This magic can happen, thanks to the fact that each duplex stream has two internal buffers, one for the readable and one for the writable, and that enables it to read and write data concurrently.

A simple example for a duplex stream is a socket server of ‘net’ module:

net.createServer(socket => {
socket.pipe(socket)
}).listen(8001);

here I implemented a simple echo server. Socket is a duplex stream. You can see I piped socket to itself. The socket of the pipe is readable stream and the second one is the writable.

If we’ll run the server, and will try to send some data to it with netcat from another terminal (nc localhost 8001). We’ll see that the server will read our data and write it as is right after.

Transform Stream

Transform stream is a kind of a duplex stream. Those streams transformed the data they get before they transfer it to the next stream.

If you’d like to implement a custom transform stream, you should implement the transform interface and the _transform method (as we did before with _read and _write).

A trivial example for that is to zip a file:

const readableStream = fs.createReadStream('file');
const transformStream = zlib.createGzip();
const writableStream = fs.createWriteStream('file.gz');
readableStream.pipe(transformStream).pipe(writableStream);

Pipeline

If you pipe a few custom streams (not streams that are implemented by Node.js) to each other, and one of them has thrown an error and crashed the app, that stream will be closed automatically, but the other streams won’t be closed, and you should take care of it.

Node.js ‘stream’ core module, has a ‘pipeline’ function. ‘pipeline’ allows you to pipe as many streams that you like to each other, and to define an error handling in the last parameter.

const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
pipeline(
fs.createReadStream('file'),
zlib.createGzip(),
fs.createWriteStream('file.gz'),
(err) => {
if (err) {
console.error('Pipeline failed.', err);
} else {
console.log('Pipeline succeeded.');
}
}
);

When Should You Use Streams?

I believe some of you have dealt I/O operations until now, without using streams, and everything was just fine. You might ask yourself why and when should I use it now?

I’ read an interesting article by Duncan Grant — https://medium.com/@dalaidunc/fs-readfile-vs-streams-to-read-text-files-in-node-js-5dd0710c80ea

Grant did a benchmark and compared regular read file and read file by a stream. His benchmark found that you always can use streams, but if you read a file that larger than 16 MB, it’s better for you to choose the stream approach.

TL;DR

If you need to produce or consume a big amount of data, streams will enable you to do it by chunks and therefore to increase your performance.

References:

https://nodejs.org/api/stream.html

--

--