Process streaming JSON with Node.js

Jake Burden
4 min readApr 9, 2018

--

I’m going to show you how to process an infinite amount of streaming JSON.

I’ve been automating tasks with various services that offer JSON APIs in my free time lately. The programs I’ve written for handling data from those services turned out to be “shaped” the same way. In an abstract sense:

  • GET data from a JSON API
  • Parse chunks of JSON as it arrives
  • Do something with each row of parsed JSON
  • Write the processed data somewhere

In code it tends to look something like this:

We’ll start by making a basic http server that responds to every request with an infinite stream of data. As long as the client request stays open, this server will continue responding with ndjson.

The response header is set to “application/json” so that I can see the streaming ndjson in Firefox by visiting http://localhost:9090. The next number in the sequence is written to the browser every 100ms. Here is what that looks like:

How might we process this infinite stream of data? It’s not a regular JSON file that we can loop through or map over. We have to think about processing this data over time rather than in space as it’s so eloquently articulated in the pull-stream docs.

Streaming HTTP

I tend to use hyperquest, but you can use request or something else that implements a Stream interface.

hyperquest(source)

Streaming JSON parser

To parse JSON over a stream from most API services you should use JSONStream. It works great for valid JSON sets. In this example I’ll be using ndjson instead, because my server is responding with an infinite stream of newline-delimited JSON.

hyperquest(source)
.pipe(ndjson.parse())

Processing JSON in a duplex stream

Each row of JSON will be parsed after this point in the pipeline. We can now process each row as a regular object. Since the value in each object is a number, lets do a basic example of squaring the number:

We’ll use through2 to create a duplex stream that will transform the data flowing through this pipeline. Sometimes I’ll use to2 if a transformation is the last step of the pipeline, but for this example, we’ll use through2.

We can use the through.obj method to pipe the ndjson parsed object directly to the through stream. To keep the pipeline short, we’ll create a named write function to process the row and pass that to the through stream.

There are two ways to pass data in through2. You can call this.push(data) and then explicitly call next() when you’re ready for the through stream to get its next chunk of data. Or you can call next(null, data) as we do below. This will also pass your data through the stream and call for the next chunk.

Unless the next stream in the pipeline is also going to be in objectMode, we’ll have to make sure that Number we are passing in next is cast as a String. To make the output easy to parse, we’ll add a newline to the end.

hyperquest(source)
.pipe(ndjson.parse())
.pipe(through.obj(write))
function write (row, enc, next) {
next(null, String(row.value * row.value) + '\n')
}

Write the data

The pipeline is 3/4 of the way finished. We are reading ndjson from a source, parsing it as JSON objects, and processing each row of JSON. Now we have to write this transformed data somewhere. We’ll write it to stdout for this example by adding .pipe(process.stdout) to the pipleine.

hyperquest(source)
.pipe(ndjson.parse())
.pipe(through.obj(write))
.pipe(process.stdout)
function write (row, enc, next) {
next(null, String(row.value * row.value) + '\n')
}

Safe streams with pump()

We’ve got a full pipeline now! It’s also a good habit to use a module called pump to handle the streams. Pump handles closing streams if the destination closes early or errors, and it allows you to pass a callback at the end of your pipeline to know when it’s finished or if any of your streams erred without having to explicitly listen for an error event on each one. Here is the above code using pump rather than .pipe :

If we run our server in one terminal tab, and run the client in another, we should now have an infinite stream of data being processed!

server and client running to process an infinite stream of data

to see more examples, check the modules linked at the bottom of this readme

Edit: Since writing this, a new Streams API has been released. You can now call require(‘stream’).pipeline for the same functionality as the pump npm module.

--

--