Working with Node.js Stream API

Darko Milosevic
Nov 19, 2019 · 11 min read

Introduction

The word `Stream` is used in computer science to describe chunked data collection, which is not available all at once but across time. A stream is basically a collection of values, similar to an array, but inverted from a spatial to a temporal axis.

In Node.js, `Stream` is name of a module that implements an API for working with streaming data.

The Node.js stream API has evolved significantly since it was first introduced in 2009. The constantly evolving API creates confusion around the various ways to implement and availability to mix different interfaces.

We are going to focus on the latest `Stream 3` implementation, along with new useful APIs that come along with Node v10+.

Stream Basics

All streams are instances of EventEmitter — which means that they emit events that can be used to read and write data.

Stream types

There are four fundamental stream types within Node.js.

Readable:

  • abstraction for a source which can be read and consumed
  • examples: HTTP responses on the client, HTTP requests on the server, fs read streams, process.stdin etc.

Writable:

  • abstraction for a destination to which data can be written
  • examples: HTTP responses on the server, HTTP requests on the client, fs write streams, process.stout, process.stderr etc.

Duplex:

  • streams that implement both a readable and a writable interface
  • example: TCP socket (net.Socket)

Transform:

  • streams similar to Duplex streams, with the ability to modify or transform data as it is read and written
  • example: compress stream (zlib.createGzip)

Stream modes

There are two modes that Node.js stream operate in:

Standard mode:

  • this mode is set by default
  • operating on `STRING` and `BUFFER` (or `UInt8Array`) types
  • only type used in Node’s internal stream implementations

Object mode:

  • set by `objectMode` flag while creating a Stream
  • internal buffering algorithm counts objects rather than bytes

Buffering

Every stream has an internal buffer that will be used for storing data. Readable and writable streams have one each, and it can be retrieved using `readable.readableBuffer` and `writable.writableBuffer`.

Duplex and Transform streams have two separate buffers, allowing each side to operate independently.

Size of the buffer is set by `highWatermarkOption`. For streams operating in standard mode, it specifies buffer size and for streams in object mode, it specifies number of objects.

Backpressure

Backpressure is a concept that is difficult to grasp for people who start working with the Stream API, which makes it a common source of bugs. Without backpressure, streams would not be as efficient as it is one of the most important properties of streams.

Backpressure is the signal that a writable stream sends back to a readable stream. The signal is sent when the readable stream is reading data too fast, and the writable stream’s internal buffer (which is set by `highWatermarkOption`) gets filled faster than it can be processed.

The signal alerts the readable stream to pause before sending more data. Backpressure is allowing reliable, pull-based transfer of data between readable and writable streams.

A few things may happen if a backpressure system isn’t taken into consideration while transferring data:

  • system’s memory gets used up
  • current processes are slowed down
  • garbage collector gets overworked

Backpressure handles reliable, lossless and memory efficient transfer of data, which is the primary purpose of Node.js Stream API.

API for Stream Consumers

Many Node.js applications use streams. Having familiarity with the API for Stream Consumers allows you to use and consume streams properly.

Consuming Writable Streams

Every writable stream instance has these methods:

writable.write(chunk[, encoding][, callback])

  • Writes some data to the stream
  • Returns `false` if internal buffer has been filled, otherwise `true`

writable.end([chunk][, encoding][, callback])

  • Signals that no more data will be written to the writable. An optional final chunk of data can be written before closing

writable.cork()

  • Forces all written data to be buffered in memory

writable.uncork()

  • Flushes all data buffered since `stream.cork()` was called

writable.destroy()

  • Destroys the stream

The next snippet of code represents a simple use of a writable stream instance, without handling backpressure, which is most likely NOT what you want to do:

These are events that can be emitted by a writable instance:

drain

  • After `writable.write()` returns `false` because of the backpressure and that pressure has been relieved, this event will be emmitted when it is appropriate to resume writing data to the stream

error

  • Emitted if an error occurred while writing or piping data (stream is not closed when this event is emitted)

finish

  • Emitted after the `writable.end()` method has been called, and all data has been flushed

close

  • Emitted when the stream and any of its underlying resources have been closed

pipe

  • Emitted when the `.pipe()` method is called on a readable stream

unpipe

  • Emitted when the `.unpipe()` method is called on a readable stream

A simple example to appropriately write to a writable stream manually (without `readable.pipe()` ), and taking backpressure into consideration is:

This is a simple example, since it only writes the same sentence in a loop. The most important aspect is managing backpressure.

Backpressure, among other things, is conveniently handled by the `readable.pipe()` method, which looks something like this:

We will go into more detail on the `readable.pipe()` method further in the article.

In addition to focusing on backpressure when creating manual writes to a writable stream instance, listening for errors while you write is also important.

Here is a complete example for manually writing to a writable stream, taking into consideration backpressure, proper error handling and post write operations (logging in this case):

Consuming Readable Streams

Readable streams can operate in two modes:

  • flowing — data is read from the underlying system automatically and provided to an application as quickly as possible
  • paused — the `readable.read()` method must be called explicitly to read chunks of data

(Object mode is also mentioned in the docs, but it is a separate feature, both flowing and paused readable streams can be in object mode or not)

All readable streams start in paused mode. To switch from paused to flowing mode you must do one of the following, which will be covered extensively in the next section:

  • Add a `data` event listener
  • Call the `readable.resume()` method
  • Attach a readable to a writable with `readable.pipe()`

To switch back to paused mode you must do one of the following:

  • if there are no `pipe()` destinations, call `readable.pause()`
  • if there are `pipe()` destinations, remove all of them (`readable.unpipe()` can help with that)

There are four ways of consuming readable streams. Developers should choose one of the methods of consuming data. Mixing API’s can lead to unexpected behavior and should never be done while consuming data from a single stream.

  1. Using `readable.pause()`, `readable.resume()` and `data` event:

`data` event

  • emitted whenever the stream is passing chunk of data (automatically switches stream to flowing mode when listener attached)

`readable.pause()`

  • pauses stream, switching it to paused mode

`readable.resume()`

  • switches stream to flowing mode

An example of a readable stream that is consumed and data is written to stdout. Nothing very useful but it will serve well as a demonstration:

2. Using `readable.read()` and `readable` event:

`readable` event

  • fired when there is some underlying data to be read (attaching a listener to `readable` switches stream to paused mode)

`readable.read([size])`

  • pulls some data out of the internal buffer and returns it. Returns `null` if there is no data left to read. By default, data will be returned as `Buffer` if no encoding is specified.

This is a similar example to the one above, but uses the second way of consuming a readable stream:

3. Using `readable.pipe()`:

`readable.pipe(writable[, options])`

  • attaches a writable stream to a readable stream switching it to flowing mode and causing readable to pass all its data to the attached writable stream. Flow of data (i.e. backpressure) will be automatically handled.

This is the most convenient for consuming a readable stream, since it is not verbose, and backpressure and closing the streams is automatically handled when finished.

A simple example copied from one of previous code snippets:

One thing that is not automatically managed is error handling and propagation. For example, if we want to close each stream when an error occurs, we have to attach error event listeners.

An example of a complete version of consuming readable streams with pipe with proper error handling:

4. Using Async Iteration / Async Generators:

  • readable streams implement the [Symbol.asyncIterator] method, so they can be iterated over with `for await of`

Async Generators are officially available in Node v10+. The async generators are a mix of async functions and generator functions. They implement [Symbol.asyncIterator] method, and can be used for async iteration. Generally streams are a chunked collection of data across time, therefore Async Generators fit perfectly. Here’s an example:

Consuming Duplex and Transform Streams

Duplex streams implement both the readable and writable interface. One kind of duplex stream is a `PassThrough` stream. This type of stream is used when some API expects readable stream as a parameter, and you also want to manually write some data.

To accomplish both needs:

  • Create an instance of a `PassThrough` stream
  • Send the stream to the API (the API will use the readable interface of the stream)
  • Add some data to the stream (using the writable interface of the stream)

This process is shown below:

Transform streams are Duplex streams. These streams have both readable and writable interface but their main purpose is to transform passing data.

The most common example is compressing data with built-in transform stream from `zlib` module:

Useful class methods (Node v10+)

`Stream.finished(stream, callback)`

  • allows you to get notified when a stream is no longer readable, writable or has experienced an error or a premature close.

This method is useful for error handling or performing further actions after the stream is consumed. An example:

Stream.pipeline(…streams[, callback])`

  • method to pipe between streams forwarding errors and properly cleaning up and provide a callback when the pipeline is complete.

This method is the cleanest and least verbose way of building stream pipelines. In contrast to `readable.pipe()`, everything is handled automatically, including error propagation and cleaning up of resources after the process has ended. An example:

API for Stream Implementers

Stream API is extendable and provides an interface for developers to create their own stream extensions. There are two ways to implement your own stream.

  1. Extend the appropriate parent class:

The new stream class must implement one or more specific methods, depending on the type of stream being created (they will be listed as we go through an implementation of each type of stream).

Those methods are prefixed with an underscore, and they are only meant for use while implementing new streams. If they are invoked while consuming they will cause unexpected behavior.

2. Extending streams in a simplified way by directly create instances and providing appropriate methods as constructor options:

A good thing to remember is that in this case, the required methods are not prefixed with underscore.

Implementing Writable Streams

In order for us to implement a writable stream, we need to provide a `writable._write()` method to send data to the underlying resource:

`writable._write(chunk, encoding, callback)`

  • chunk — chunk to be written
  • encoding — needed if chunk is of type `String`
  • callback — must be called to signal that the write either completed or failed

One simple writable stream implementation:

This stream is piping standard input to standard output, except when you enter forward dash, then the stream throws. This example servers the purpose of demonstration.

Implementing a Readable Stream

To implement a custom readable stream, we must call the readable constructor and implement the `readable._read()` method (other methods are optional), and inside of it we must call `readable.push()`:

`readable._read(size)`

  • when this method is called, if data is available from the source, the implementation should begin pushing that data into the read queue using the `this.push(dataChunk)` method
  • size — number of bytes to be read asynchronously

`readable.push()`

  • method intended to be called only by readable implementers, and only from within the `readable._read()` method. When called the chunk of data will be added to the internal queue for users of the stream to consume (`null` is a terminating character).

This implementation of a readable stream below will generate random integers from 1 to 10 every second for a minute, then the stream will finish generating data and close itself.

Implementing a Duplex Stream

A duplex stream implements both the readable and writable interfaces independent from one another. The duplex class prototypically inherits from stream.Readable and parasitically from stream.Writable (JavaScript does not have support for multiple inheritance).

To create a custom implementation of a duplex stream you have to implement every required method for writable and readable streams, which are `readable._read()` and `writable._write()`.

This stream below logs everything from stdin (writable side), and pipes random smileys to stdout (readable side) until sad smiley comes up, then we terminate the readable stream.

Implementing a Transform Stream

A transform stream is similar to a duplex stream (it is a type of duplex stream), but with a simpler interface. The output is computed from the input. There is no requirement that the output is the same size as the input, the same number of chunks, or arrive at the same time.

Only one method is required for implementing transform stream, and that is `transform._transform()` method (`transform._flush()` is optional).

`transform._transform(chunk, encoding, callback)`

  • handles the bytes being written, computes the output, then passes that output off to the readable portion using the `readable.push()` method. May be called multiple times to generate output for one received chunk, or not generate output at all:
  • chunk — piece of data to be written
  • encoding — needed for chunks of type `String`
  • callback(err, transformedChunk)

`transform._flush(callback)` — optional.

  • In some cases, a transform operation may need to emit an additional bit of data at the end of the stream, when some computation has been completed. Before the stream ends this method flushes the data.

Summary

In this article we learned how to consume all of the Node.js Stream types. We also learned how to implement our own streams, and use their powerful features.

Node.js Streams have a reputation for being hard to work with, but with an understanding of their distinct APIs they become invaluable tool.

Florence Development

The story behind developing software for Clinical Trials.