Working with Node.js Stream API

Darko Milosevic
Nov 19 · 11 min read

Introduction

The word `Stream` is used in computer science to describe chunked data collection, which is not available all at once but across time. A stream is basically a collection of values, similar to an array, but inverted from a spatial to a temporal axis.

In Node.js, `Stream` is name of a module that implements an API for working with streaming data.

The Node.js stream API has evolved significantly since it was first introduced in 2009. The constantly evolving API creates confusion around the various ways to implement and availability to mix different interfaces.

We are going to focus on the latest `Stream 3` implementation, along with new useful APIs that come along with Node v10+.


Stream Basics

All streams are instances of EventEmitter — which means that they emit events that can be used to read and write data.

Stream types

There are four fundamental stream types within Node.js.

Readable:

Writable:

Duplex:

Transform:

Stream modes

There are two modes that Node.js stream operate in:

Standard mode:

Object mode:

Buffering

Every stream has an internal buffer that will be used for storing data. Readable and writable streams have one each, and it can be retrieved using `readable.readableBuffer` and `writable.writableBuffer`.

Duplex and Transform streams have two separate buffers, allowing each side to operate independently.

Size of the buffer is set by `highWatermarkOption`. For streams operating in standard mode, it specifies buffer size and for streams in object mode, it specifies number of objects.

Backpressure

Backpressure is a concept that is difficult to grasp for people who start working with the Stream API, which makes it a common source of bugs. Without backpressure, streams would not be as efficient as it is one of the most important properties of streams.

Backpressure is the signal that a writable stream sends back to a readable stream. The signal is sent when the readable stream is reading data too fast, and the writable stream’s internal buffer (which is set by `highWatermarkOption`) gets filled faster than it can be processed.

The signal alerts the readable stream to pause before sending more data. Backpressure is allowing reliable, pull-based transfer of data between readable and writable streams.

A few things may happen if a backpressure system isn’t taken into consideration while transferring data:

Backpressure handles reliable, lossless and memory efficient transfer of data, which is the primary purpose of Node.js Stream API.


API for Stream Consumers

Many Node.js applications use streams. Having familiarity with the API for Stream Consumers allows you to use and consume streams properly.

Consuming Writable Streams

Every writable stream instance has these methods:

writable.write(chunk[, encoding][, callback])

writable.end([chunk][, encoding][, callback])

writable.cork()

writable.uncork()

writable.destroy()

The next snippet of code represents a simple use of a writable stream instance, without handling backpressure, which is most likely NOT what you want to do:

These are events that can be emitted by a writable instance:

drain

error

finish

close

pipe

unpipe

A simple example to appropriately write to a writable stream manually (without `readable.pipe()` ), and taking backpressure into consideration is:

This is a simple example, since it only writes the same sentence in a loop. The most important aspect is managing backpressure.

Backpressure, among other things, is conveniently handled by the `readable.pipe()` method, which looks something like this:

We will go into more detail on the `readable.pipe()` method further in the article.

In addition to focusing on backpressure when creating manual writes to a writable stream instance, listening for errors while you write is also important.

Here is a complete example for manually writing to a writable stream, taking into consideration backpressure, proper error handling and post write operations (logging in this case):

Consuming Readable Streams

Readable streams can operate in two modes:

(Object mode is also mentioned in the docs, but it is a separate feature, both flowing and paused readable streams can be in object mode or not)

All readable streams start in paused mode. To switch from paused to flowing mode you must do one of the following, which will be covered extensively in the next section:

To switch back to paused mode you must do one of the following:


There are four ways of consuming readable streams. Developers should choose one of the methods of consuming data. Mixing API’s can lead to unexpected behavior and should never be done while consuming data from a single stream.

  1. Using `readable.pause()`, `readable.resume()` and `data` event:

`data` event

`readable.pause()`

`readable.resume()`

An example of a readable stream that is consumed and data is written to stdout. Nothing very useful but it will serve well as a demonstration:

2. Using `readable.read()` and `readable` event:

`readable` event

`readable.read([size])`

This is a similar example to the one above, but uses the second way of consuming a readable stream:

3. Using `readable.pipe()`:

`readable.pipe(writable[, options])`

This is the most convenient for consuming a readable stream, since it is not verbose, and backpressure and closing the streams is automatically handled when finished.

A simple example copied from one of previous code snippets:

One thing that is not automatically managed is error handling and propagation. For example, if we want to close each stream when an error occurs, we have to attach error event listeners.

An example of a complete version of consuming readable streams with pipe with proper error handling:

4. Using Async Iteration / Async Generators:

Async Generators are officially available in Node v10+. The async generators are a mix of async functions and generator functions. They implement [Symbol.asyncIterator] method, and can be used for async iteration. Generally streams are a chunked collection of data across time, therefore Async Generators fit perfectly. Here’s an example:

Consuming Duplex and Transform Streams

Duplex streams implement both the readable and writable interface. One kind of duplex stream is a `PassThrough` stream. This type of stream is used when some API expects readable stream as a parameter, and you also want to manually write some data.

To accomplish both needs:

This process is shown below:

Transform streams are Duplex streams. These streams have both readable and writable interface but their main purpose is to transform passing data.

The most common example is compressing data with built-in transform stream from `zlib` module:

Useful class methods (Node v10+)

`Stream.finished(stream, callback)`

This method is useful for error handling or performing further actions after the stream is consumed. An example:

Stream.pipeline(…streams[, callback])`

This method is the cleanest and least verbose way of building stream pipelines. In contrast to `readable.pipe()`, everything is handled automatically, including error propagation and cleaning up of resources after the process has ended. An example:


API for Stream Implementers

Stream API is extendable and provides an interface for developers to create their own stream extensions. There are two ways to implement your own stream.

  1. Extend the appropriate parent class:

The new stream class must implement one or more specific methods, depending on the type of stream being created (they will be listed as we go through an implementation of each type of stream).

Those methods are prefixed with an underscore, and they are only meant for use while implementing new streams. If they are invoked while consuming they will cause unexpected behavior.

2. Extending streams in a simplified way by directly create instances and providing appropriate methods as constructor options:

A good thing to remember is that in this case, the required methods are not prefixed with underscore.

Implementing Writable Streams

In order for us to implement a writable stream, we need to provide a `writable._write()` method to send data to the underlying resource:

`writable._write(chunk, encoding, callback)`

One simple writable stream implementation:

This stream is piping standard input to standard output, except when you enter forward dash, then the stream throws. This example servers the purpose of demonstration.

Implementing a Readable Stream

To implement a custom readable stream, we must call the readable constructor and implement the `readable._read()` method (other methods are optional), and inside of it we must call `readable.push()`:

`readable._read(size)`

`readable.push()`

This implementation of a readable stream below will generate random integers from 1 to 10 every second for a minute, then the stream will finish generating data and close itself.

Implementing a Duplex Stream

A duplex stream implements both the readable and writable interfaces independent from one another. The duplex class prototypically inherits from stream.Readable and parasitically from stream.Writable (JavaScript does not have support for multiple inheritance).

To create a custom implementation of a duplex stream you have to implement every required method for writable and readable streams, which are `readable._read()` and `writable._write()`.

This stream below logs everything from stdin (writable side), and pipes random smileys to stdout (readable side) until sad smiley comes up, then we terminate the readable stream.

Implementing a Transform Stream

A transform stream is similar to a duplex stream (it is a type of duplex stream), but with a simpler interface. The output is computed from the input. There is no requirement that the output is the same size as the input, the same number of chunks, or arrive at the same time.

Only one method is required for implementing transform stream, and that is `transform._transform()` method (`transform._flush()` is optional).

`transform._transform(chunk, encoding, callback)`

`transform._flush(callback)` — optional.


Summary

In this article we learned how to consume all of the Node.js Stream types. We also learned how to implement our own streams, and use their powerful features.

Node.js Streams have a reputation for being hard to work with, but with an understanding of their distinct APIs they become invaluable tool.

Florence Development

The story behind developing software for Clinical Trials.

Darko Milosevic

Written by

Software Engineer @ Florence Healthcare

Florence Development

The story behind developing software for Clinical Trials.

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade