Streams, Piping, and Their Error Handling in Node.js

Explained with examples

Kunal Tandon
Nov 18 · 6 min read

In the last article, we learned the basics of streams and buffers in Node.js. We also learned about readable streams in Node.js.

If you haven’t read the last article on streams and buffers, read it here:

In this article, we will learn about the writable streams, streams piping, and the events and the functions available on a writable stream in Node.js.

A Writable Stream

A writable stream is a stream of data that is created to write some streaming data. For example, creating a write stream to write a text file for some streaming data.

Let’s consider the following example for creating a writable stream in Node.js.

I am using TypeScript instead of JavaScript.

import { createWriteStream } from 'fs';const writeStream = createWriteStream('./dump.txt');

In the above code, we created a write stream to write some streaming data to a file named dump.txt.

Running the above two lines creates a file with the named dump.txt but without any data inside it.

To actually write some data to the file, we need to call the write function of the write stream.

writeStream.write('hello\n');
writeStream.write('world\n');

Running the following two write functions on the writeStream variable creates a file named dump.txt and inserts the following text into it.

dump.txt

hello
world

Stream Piping and Unpiping

In an Express application, the req (request) and res (response) for a request handler are streams. A req is a readable stream of data whereas res is a writable stream of data.

Considering this for an HTTP request, we have to serve a really large file, we can do so by using streams.

The following code in Node represents a similar functionality:

import { createReadStream } from 'fs';
import express from 'express';
const app = express();app.get('/', (req, res) => {
var readStream = createReadStream('./data.txt');
readStream.on('data', (data) => {
res.write(data);
});
readStream.on('end', (data) => {
res.status(200).send();
});});
app.listen(3000);

When we request the browser to http://localhost:3000, the request handler is triggered. The handler creates a readStream for file data.txt.

For the data event of readStream, we have called the write method of the res writeStream. Also, on the end event of the readStream, we send a 200 OK status code to the client.

All this code works perfectly and solves the problem well. But there is a shortcut to this problem.

Instead of listening to the data and the end events, we can simply pipe these two streams.

With piping, we simply mean that the two streams are connected. The data that is passed to stream 1 is also passed through stream 2 which is piped to stream 1.

A shorter implementation for the get method using piping is:

app.get('/', (req, res) => {
readStream.pipe(res);
});

With stream piping, the code size is reduced to only one line of code.

In the above code, we piped the read stream from the file to the write stream of the response. This simply means the streaming data from the readStream will be piped and passed through the res write stream.

Similar to the pipe function, there is also an unpipe function on a stream. We can simply call source.unpipe(destination) anytime to stop passing of the data from the source stream to the destination stream.

import { createReadStream } from 'fs';
import express from 'express';
const app = express();var readStream = createReadStream('./data.txt');app.get('/', (req, res) => {
readStream.pipe(res);
setTimeout(() => {
readStream.unpipe(res);
res.status(200).send();
}, 10);
});
app.listen(3000);

In the code, we unpiped the stream after 10 milliseconds. Only the data from the read stream that was piped to the res write stream will be sent to the client.

After 10 ms, the res stream will be unpiped from the readStream and we send the 200 status to the client.

As a result of running this code, we do not get the complete content of the data.txt file in the browser.

In my case, the data.txt was 16000 lines of text, but I received only approx. 3800 lines of text on the client size as the stream was unpiped in between sending it to the client.

Error Handling for Normal and Piped Streams

In streams, we handle the errors by creating an error event listener on the stream. The listener gets triggered as soon as an error comes up in the stream.

myStream.on('error', (err) => {
console.log(err);
});

For the case of error handling in piped streams, let us consider the following code snippet:

import { createReadStream } from 'fs';
import express from 'express';
const app = express();app.get('/', (req, res) => {
var readStream = createReadStream('./data.txt');
readStream.pipe(res);

readStream.on('error', (err) => {
console.log('Error in read stream...');
});
res.on('error', (err) => {
console.log('Error in write stream...');
});

setTimeout(() => {
readStream.emit('end');
}, 20);
});
app.listen(3000);

For an HTTP request, we created a read stream for a file and we piped it to the HTTP response write stream.

In the code, we closed the read stream in between streaming and this resulted in triggering an error on the write stream for reading from a stream that does not exist anymore.

Running the above code and creating an HTTP request to the endpoint generates the following output:

Error in write stream... 
Error [ERR_STREAM_WRITE_AFTER_END]: write after end
at write_ (_http_outgoing.js:572:17)
at ServerResponse.write (_http_outgoing.js:567:10)
at ReadStream.ondata (_stream_readable.js:709:20)
at ReadStream.emit (events.js:198:13)
at ReadStream.EventEmitter.emit (domain.js:448:20)
at ReadStream.Readable.read (_stream_readable.js:504:10)
at flow (_stream_readable.js:973:34)
at ServerResponse.pipeOnDrainFunctionResult (_stream_readable.js:777:7)
at ServerResponse.emit (events.js:198:13)
at ServerResponse.EventEmitter.emit (domain.js:448:20)

If we are having a series of piped streams in Node.js, we have to do the error handling for each of the streams individually.

a.pipe(b).pipe(c)

In Node, a pipe does not forward error to the next pipe.

To handle errors in the above case of piped streams, we have to add an error handler on each of the streams like this:

a.on('error', err => console.log(err));
b.on('error', err => console.log(err));
c.on('error', err => console.log(err));
a.pipe(b).pipe(c)

As a result, if any of the streams encounters an error, its corresponding error handler will be triggered and the process will not exit due to unhandled errors.

Events Available on a Writable Stream

The following are the events we can listen for on a writable stream.

drain

Consider we are having a scenario where the stream buffer is full and we want to know when the buffer has some space to continue writing. In such a scenario, we listen to the drain event of the stream.

The drain event triggers as soon as it will appropriate for the stream to resume writing the data.

myStream.on('drain', () => {
console.log('Stream writing can be resumed now...');
});

close

The event is triggered as soon as the stream is closed using the stream.close() function.

finish

The event is triggered after the stream has completed streaming.

pipe/unpipe

The events are triggered as soon as the stream is piped or unpiped by a stream.

readStream.on('pipe', data => {
console.log('ReadStream: Piped on Stream...\n', data);
});
readStream.on('unpipe', data => {
console.log('ReadStream: Unpiped on Stream...\n', data);
});
writableStream.on('pipe', data => {
console.log('WriteStream: Piped on Stream...\n', data);
});
writableStream.on('unpipe', data => {
console.log('WriteStream: Unpiped on Stream...\n', data);
});
readStream.pipe(writableStream);

Running the following code will give the output as:

WriteStream: Piped on Stream...ReadStream {
_readableState:
ReadableState {
objectMode: false,
... (The complete Read Stream instance)

Functions Available on a Write Stream

cork/uncork

Consider a scenario where the data flows at a really slow speed and we want some data to get buffered in the stream before using it. We can do so using the cork method on a writable stream.

By calling the cork method, the stream will not write data to the destination and will hold the data in the buffer.

To get the buffer data flushed to the destination, we need to call the uncork method.

writableStream.cork();writableStream.write('1');
writableStream.write('2');
writableStream.write('3');
process.nextTick(() => {
writableStream.uncork();
});

In the above code, we corked the stream. As a result, the stream will stop the data flow until it is uncorked.

When uncorked in the nextTick’s callback, we get 1 2 3 in the buffer of the writable stream being passed to the destination.

Note: If we call the cork() function twice to cork a stream, we have to call the uncork() function twice to enable the data flow from the stream.

To read about process.nextTick, read the following article.

write

This function is used to write some data into the buffer of the stream.

  • If the stream is corked, the data will stay in the buffer.
  • If the stream is uncorked, it will be passed to the destination.

We used the write function in the cork/uncork example.

close

This function is used to close the stream. As soon as the close function will be called, the close event listener will be triggered on the stream.

For more such articles, visit www.knowledgescoops.com

Want to read more articles like this?
Follow me on medium @kunaltandon.kt
Connect with me on LinkedIn:
https://www.linkedin.com/in/kunal-tandon/

JavaScript in Plain English

Learn the web's most important programming language.

Kunal Tandon

Written by

👨‍💻 Coder • 📝 Blogger • 🎮 Game Developer …………… 💁🏼‍♂️ Connect with me @ https://www.linkedin.com/in/kunal-tandon/

JavaScript in Plain English

Learn the web's most important programming language.

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade