Reduce memory usage using NodeJs Stream API and Generator in IO intensive application

Yiqun Rong
5 min readSep 13, 2023

--

There are a lot of cases that we need to handle a large amount of data coming from different data sources. Those data are usually paginated so that you can fetched them batch by batch.

There are many ways to process the incoming data which you can controls the rate of generating. Some people prefer to start processing the data until all the data is loaded into the memory. It is simple but it may induce the issues of huge memory usage. So in this story, I am going to show you how we can start processing the data without waiting all the data to be loaded into the memory.

Before going deeper into the real problem, we need to be familiar with these two concepts, NodeJs Stream and NodeJs Generator.

NodeJs Streams

Bottles in the assembly line of the factory

Streams are collections of data, just like arrays or strings. They might not be available all at once, and they don’t have to fit in memory. Streams are powerful when working with large amounts of data, or data that’s coming from an external source one chunk at a time.

We can use the soft drink bottling factory example as the an analogy. Each bottle goes through 3 stages in the assembly line before becoming a final product. Each bottle here can be seen as a individual data object which can be processed independently. This continuous flow of bottle in the assembly line can be seen as a data stream. In addition, those processing stages each bottle goes through can be seen as the Transform process inside the stream.

The data source of the data stream can be from different services or data storages. They can DynamoDB Paginated table query, ElasticSearch scroll API, a S3 file, a local file, or Any API allow pagination.

Read more on
Node Stream API Docs

Node.js Streams: Everything you need to know

Async Generators

Asynchronous iteration allow us to iterate over data that comes asynchronously, on-demand. Like, for instance, when we download something chunk-by-chunk over a network.

Image that you want to save or process a list of object while fetching them from a paginated endpoint using the next token. I have my codebase in Github for this demo.

{
data: [
{
"id": 1,
"first_name": "Arvy",
"last_name": "Derisley",
"email": "aderisley0@4shared.com",
"gender": "Male",
"ip_address": "192.49.240.128"
},
{
"id": 2,
"first_name": "Stanley",
"last_name": "Arnefield",
"email": "sarnefield1@flickr.com",
"gender": "Male",
"ip_address": "138.131.169.201"
},
{
"id": 3,
"first_name": "Ruperto",
"last_name": "Klimes",
"email": "rklimes2@nature.com",
"gender": "Male",
"ip_address": "254.250.229.74"
}
],
nextToken : "MOCK_DATA2"
}

How can I convert this item fetch process as a stream? We can make use of the asynchronous generator. In the execution of generator, It will yield more than one results. A result will be returned as the generator is called until reaching the end.

const axios = require("axios");

async function* fetchItemGenerator() {
let currentNextToken = null;
do {
const {
data: { data, nextToken },
} = await axios.get(
currentNextToken
? `http://localhost:3000?nextToken=${currentNextToken}`
: "http://localhost:3000"
);
for (const item of data) {
yield item;
}
currentNextToken = nextToken;
} while (!!currentNextToken);
}

Use Async Generators as Read Streams

How can we convert Async Generators as NodeJs Stream? The stream.Readable.from() method is an inbuilt application programming interface of the Stream module which is used to construct Readable Streams out of iterators.

stream.Readable.from( iterable, options )

Parameters: This method accept two parameters as mentioned above and described below:

  • iterable: It is an object which implements the Symbol.asyncIterator or Symbol.iterator iterable protocol.
  • options: It is an options provided to the new stream.Readable([options]). By default, the Readable.from() method will set options.objectMode to true, unless its not set to false manually.

Example 1: How can we save paginated objects from API to Local files while fetching data?

In this case, we can incorporate Async Generators and NodeJs stream to streamline the objects saving process to local JSON LINE file. It starts writing data to file once the first item is fetched from the network. Why do we need to save a list of objects JSON Line file? You can see the detail explanation from JSON line format is better than JSON when storing list of objects in file.

const { Readable, Transform, pipeline } = require("stream");
const { createWriteStream } = require("fs");
const { promisify } = require("util");
const pipelinePromise = promisify(pipeline);
const fetchItemGenerator =
require("../generators/fetchItemGenerator").fetchItemGenerator;

const fetchItemToFile = async () => {
const fetchItem = fetchItemGenerator();
const readable = Readable.from(fetchItem);
// convert fetchItem generator to Readable stream

const transformToText = new Transform({
objectMode: true,
transform(item, encoding, callback) {
this.push(JSON.stringify(item));
this.push("\n");
callback();
},
});
const writable = createWriteStream("./results/people.jsonl");
// stream objects to local files
await pipelinePromise(readable, transformToText, writable);
// await the pipeline to be finished
};

fetchItemToFile();

Example 2: How can we do data processing / filtering on data from files?

After running the about script, you should get the people.jsonlfile in your folder. Now we want to only get the all the boys from the list and save them to a new file. We can load the objects chunk by chunk from the file and then filter them one by one down the stream. Subsequently the stream goes down the the file write stream to a new JSONL file.

const { Transform, pipeline } = require("stream");
const { createWriteStream } = require("fs");
const { promisify } = require("util");
const pipelinePromise = promisify(pipeline);
const { getItemsFromFile } = require("./getItemsFromFile");

const fileToFile = async () => {
const genderFilterTransform = new Transform({
objectMode: true,
transform(item, encoding, callback) {
if (item.gender === "Male") { // only keep the male people
this.push(item);
}
callback();
},
});

const transformToText = new Transform({
objectMode: true,
transform(item, encoding, callback) {
this.push(JSON.stringify(item));
this.push("\n");
callback();
},
});

const writable = createWriteStream("./results/male.jsonl");

await pipelinePromise(
getItemsFromFile("./results/people.jsonl"),
genderFilterTransform,
transformToText,
writable
);
};

fileToFile();

You can use a similar method to transform objects to CSV row in stream to generate CSV file using node-csv.

Conclusion

Use NodeJs stream to transform and transfer data across different services

Nodejs Stream API is very useful tool when it comes to process independent data from paginated data source. It prevents huge memory usage since it processes items one by one without keeping a large intermediate computational results in memory.

--

--