Catching errors in NodeJS stream pipes

MrManafon
Homullus

--

If you use Google to search for how to handle errors that happen within streams and their intermediate pipes, you’ll get a taste of something very known in Java and PHP communities — historical inertia.

Basically, the languages have been around for so long, that the most popular answers to questions are actually ancient, and even though we do have more appropriate stdlib methods nowadays, we collectively still use the 20 year old ones like .pipe , let’s raise awareness.

An asynchronous pipe walks into a bar

Now, there is nothing fundamentally wrong with the .pipe method on streams. We use it all the time and it works flawlessly.

In a more modern context though, it lives inside an await-heavy NodeJS TypeScript application. The async model gave rise to NodeJS as an accepted solution, but it’s the ability to randomly mix in await’s without blocking the event loop that made it palatable.

Otherwise we’d still be writing callback pyramids or writing npm modules to solve them.

So, the pipe offers a very asynchronous way of execution, as it once used to be. And this is great for a lot of use cases. Actually it works fine most of the time. It worked fine for years now. And still does.

stream.pipe(b).pipe(c);

It has a slight problem, you can’t catch errors that get thrown inside it, unless you do some syntax gymnastics.

What is a propper way to handle errors in an pipe? It’s to handle it at every pipe individually. Each on will apply only to the immediate previous neighbout transform. Stolen from mshell_lauren of S/O:

stream.on('error', (e) => handleError(e))
.pipe(b)
.on('error', (e) => handleError(e))
.pipe(c)
.on('error', (e) => handleError(e));

function handleError(e) { throw e; } // Maybe also some logging or smth.

In fact, that’s precisely what popular libraries like multipipe do!

Now, for most use cases you yourself wouldn’t do that, way too verbose. Theoretically, I can understand the idea that each pipe presents a context of it’s own and may have a separate error handler. But it sounds like whoever designed that API without a sane default, never used it in a real product with lots of layers. It’s not unusable but gets quite confusing and error prone (pun intended).

If you don’t do that ☝🏻 , the error happens outside of the execution scope, cannot be caught, and crashes the whole process. Forget one, somewhere, and you are screwed.

Using Pipeline instead of pipe

So, we wish to have a very similar API, but one that is also able to:

  • Can assign transformers to the stream.
  • Can be composed, piped into other pipes.
  • Can be awaited or iterated upon.
  • Propagate errors, so they can get caught.
  • May have custom handlers within intermediate
  • Hopefully doesn’t hurt eyes.

Node SDK already has a stream function named pipeline , in fact it has multiple of them, a promis-ified and a normal version. They’ve been around for a long time and are stabile to use, it’s just that the S/O answers from 10 years ago pop up as first search results so you have to dig a bit.

BTW Contrary to what the internet will tell you, I’ve personally found the non-promisified version to be a more sane one. Let’s see how the above code looks like as a pipeline:

import { pipeline } from 'stream';
pipeline(stream, b, c, handleError);

Yeah thats more compact, but that wasn’t the goal here. The real goal was that now we can await it, and wrap it into a try-catch block and it will actually get caught as you would expect it to. Here’s a more complex example from my own code.

const stream = pipeline(
Readable.from(Repo.findRecentlyArchivedObjects(message)),
DeduplicateTransformation.get(['id']),
getBatchTransform(associationBatchSize),
Duplex.from((source: Readable) => this.getAssociationsBatch(source)),
new PassThrough({ objectMode: true }),
handleError,
);

try {
await stream;
} catch (e: any) {
this.logger.log('omg, error, lol', e);
}

Furthermore, the stream variable can be piped into another pipeline no problem, and we can assign a different callback as the last argument to each pipeline, if we wish (but I’ve found that a rare occurence, only when I wish to silence errors).

Final word on callback and re-throws

You can notice that I’ve ignored to talk about the handleError function that we have to give to the pipeline. For some weird reason, the STDlib decided that a callback is mandatory, even if it doesn’t do anything. Again, no sane default, wtf node?

So, what we ended up using is literally an empty callback most of the time:

/**
* An empty function for callbacks, commonly used
* as the last argument within a `pipeline()`.
*/
export function emptyCallback(): void {}

An empty callback will, by default, just re-throw whatever was thrown and thus ensure that errors get propagated.

Unlike .pipe syntax, this one is mandatory and compilation will fail if you don’t pass it, which makes it less error prone.

P.S. You could wrap the pipeline function into your own lib, that has an optional callback, that will shorten the code. I’ve found that to be burdensome when it comes to typing, and really the only benefit is a couple of characters less, nobody cares.

--

--