Streams and Pipes and Errors; Oh My.

Noah Silas
Hustle Blog
Published in
4 min readMar 13, 2017
Pushing the extended metaphor a step too far.

It all started with a fairly mundane problem: migrate some data from one MongoDB collection into another. Since the NodeJS Mongo driver can provide the results as a readable stream, this seemed to have a fairly straightforward approach: I created a Writable stream to receive the objects, perform the conversion, and write them back into Mongo:

This worked fine for my small test database, but when I tried to apply it to a large collection (for instance, when using @HustleApp’s Staging or Production database), I wasn’t getting any timely feedback about the progress of the migration. The terminal just seemed hung.

This motivated me to add a fairly common feature; a progress indicator that printed a tick mark after every 1000 rows had been processed. My initial thought was this: “Can I just hang another processor off of the readable stream that tracks progress?”

Readable streams can be piped to many Readers!

Of course I can! This is a nice property of streams; you can “fork” them and process each record in the stream in multiple ways. So, I wrote a quick implementation of a separate progress indicator:

This works out because when a readable stream is flowing to multiple destinations, it will only advance when all of the receivers are ready to accept input. In this case, the progressIndicator should quickly handle each object, but the readable stream will wait to pass it another object until the sprocketMigrator is ready to consume another.

As a nicety, we also attach a handler for the finish event on the progress indicator. This event is emitted after the stream it is reading from closes, and the stream has processed each element.

Handling Stream Errors

A poorly handled “stream” error

This is where things started getting tricky. It turned out that we had a uniqueness constraint on our new collection that our existing data would violate. This caused an error in the sprocketMigrator, and some surprising output: the migrator would stop working, but the progressIndicator kept on chugging, indicating that we were working on every row in the collection. Huh?

Streams “unpipe” on errors

It turns out that when a readable stream is piping into a writable stream, if the writable stream hits an error, the readable stream will “unpipe” itself from that receiver.

For my use-case, I could simply ignore this particular duplicate key error; I figured that I could simply re-connect the streams in that case:

migrator.on('error', (err) => {
if (err.code === 11000) { // Mongo Duplicate Key error
widgetStream.pipe(migrator);
}
});
widgetStream.pipe(migrator)

This seemed to work at first, but there was another subtle problem. While this picked up most rows, some of the ones hitting the progress indicator weren’t being processed by the migrator.

Streams don’t “pause” when they “unpipe”

In one common case, where a readable stream is piped directly into a single writable stream, breaking that connection effectively “pauses” the reader, because it has nowhere to drain to. Outside of this special case, breaking one pipe from a readable stream doesn’t stop the stream from flowing.

In our scenario, the progressIndicator was able to keep consuming the readable stream while the SprocketMigrator was disconnected, happily printing pips along the way. In hindsight, the seems fairly obvious: when the migrator hit an error and became detached, we entered a race between the progress consumer, and our callback re-attaching the migrator consumer.

This wasn’t going to work, so I needed a new strategy.

Transform Streams FTW

The problem here was my initial viewpoint of the stream as a “fork”; one readable stream piping into two writable streams. NodeJS provides another useful tool though; the “Transform” stream, which we can seat in between our readable and writable streams. Here’s our progress indicator, rewritten:

This lets us rewrite our pipeline, so that an error in the migrator doesn’t cause a race:

widgetStream.pipe(new ProgressIndicator()).pipe(migrator);

Summing Up

So, I learned that a readable stream can pipe into many writable streams, but that those writables can individually unpipe themselves if they hit an error. I learned that when they unpipe, they don’t pause the stream they are reading from, which can cause the data on the two sides of the fork to diverge. I found that I can prevent that divergence by using a TransformStream instead of piping one readable into two writables.

For my use cases, chaining transformers seems like a more intuitive pattern than forking the streams, and avoids this confusing “gotcha” when I inevitably need to handle an error downstream.

Doing this investigation added some nice tools to my programmer’s utility belt, but also raised my awareness of NodeJS stream APIs in general. I’m still learning my way here, but I’m starting now to look into using stream tools like _writev to batch operations, and I’m likely to consider implementing a Readable for new data sources in the future.

And to those following their own streams, may you have fair weather and smooth sailing!

--

--

Noah Silas
Hustle Blog

I’m reasonably confident that I’m not a golem.