Node + streams + objects. Is it possible?

Fernando Natalino
Hexacta Engineering
7 min readSep 22, 2020

In many cases, where we have little flow of information, a simple request to obtain it is enough. But when we have large volumes of data and perhaps with several sources, the use of streams is probably a good alternative since it gives us certain benefits to achieve an efficient application with good response times. But, what happens when we also want manipulate objects within our stream? Let’s see what happens …

Let’s put some context

Let’s imagine that we have a frontend which displays information that is sent by a backend. That is to say, a fairly simple architecture but let’s also think that this backend collects information from various sources, parses it, and serves it to our frontend. I prefer to call this backend a gateway. These sources have different response times, some respond very fast and others quite slow.

We have two possible problems.

  1. The response from our gateway can be quite large and heavy.
  2. The response time of our gateway will be affected by the origin that takes the longest.

There are several solutions for this. We can probably use paging techniques, change the architecture, among others. In this case we will use streams and we will verify that it is really efficient and effective.

But what is Streams?

Basically we can say that it is a method of data transmission through sequences. With this method we can read and write these sequences, and the most interesting thing is that as we do it, they continue their course. That is fundamental and already gives us an idea of ​​what it can serve us for.

Let’s try to apply this concept to the above scenario. We had two problems, we can notice that one was the response time, which was affected by the response time of the slowest source. This point with streams is easily solved, since as we mentioned, while we are receiving data we can send it. This means that as soon as the first source responds, I can send this information to the frontend and visualize it instantly. As the other sources respond, as the flow has not yet been closed, I simply write our sequence and they will be sent to the frontend. Sounds good!

A bit of code

The idea is to return information from various sources, but with different response times. With this simple example we can already see some points

Now, we would need to consume this endpoint through our frontend. One could think simply making a request to said endpoint and that’s it …

fetch('http://localhost:1002/streams')

But I’m sorry to say it’s not that easy. With this method we would be waiting for the fetch to finish closing the buffer to see the response and it is not what we want, because we would be waiting 8 seconds, which is the origin that takes the longest. That is, we send it using streams from our gateway, but we consume it differently. So let’s add streams to our frontend too!!

We used the fetch body, which is a stream that the fetch library already provides, and read what it receives. Well, we tested it and it works!

What about the buffer and the objects?

We have already sent and received streams which is not complicated, but we still haven’t seen anything regarding the buffer and object manipulation. It would be nice to do it.

If we observe the responses that our frontend prints, we will see that it does not necessarily print 5 times, which is what we would expect because our backend sent data from 5 sources. This depends on the amount of data that our origins has, if it is a lot and exceeds the buffer then that block of data is divided into N smaller blocks according to the size of our buffer. It doesn’t seem like a problem, especially when we work with text, but in our case we want to work with objects and dividing an object in half doesn’t sound good, so we would have to do something about it.

Objects in streams

Let’s configure our stream buffer to understand objects. For this we have two properties:

  1. ObjectMode. Set the streams to understand objects.
  2. highWaterMark: 1. Set the buffer so that it can only hold one object in the buffer, no more.

Let’s adjust our gateway

We tested it and it turns out that it doesn’t work 🤔

TypeError [ERR_INVALID_ARG_TYPE]: The first argument must be one of type string or Buffer. Received type object.

Quickly what we might think is that our recently created read stream was not configured properly. But watch out!! Here what is failing is the output stream, that is, our response.

When making the pipe and direct the read stream in object mode to the duplex streams (response) we have a type failure. We recall, this duplex is provided by express and is not in object mode and it is not possible to redefine it either.

But why does this happen?

Originally the streams in node were designed to make I/O processing more efficient. These streams were strings of characters or buffers and node core with that is enough. That is, internally the representation of objects is not necessary, but we well know that for our business and understanding, it is convenient to use it. For this reason, we can handle objects in streams, but node core needs that at the end of our information flow, we do a small transformation so that it can handle it internally.

Transforming our streams

We have several ways to do this. All of them through a new type of stream, the transform. The other possibility is through a library that creates that stream transform for us, configures it and simply asks us for the transformation function. Very simple! let’s go for that one. In my case use through2, but there are several.

Seeing this double addressing in pipes, you can imagine the power of the transformations and the use of nested pipes.

Was it really necessary to use a transform stream in our example?

Clearly not because the example is very simple. But anyway, it is very practical because we convert our objects to text on one side, otherwise we would have to do it in each write of our read stream, as we had when we started with the example.

Testing testing…

When testing all our code from the frontend we already see the horizon. But a detail is missing that can be a inconvenience if we are not clear about it. When the data to sent is really big, the frontend is not showing 5 messages, it shows many more. 😵

And not only many messages, but it is also not understood. Let’s decode the message and see what it’s about. 🤓

Messages are cut off !! Sure, in our frontend we have the following

await fetchedResource.body.getReader();

That is, a read stream provided by the fetch library that clearly manages a much smaller buffer than the one that we set at the gateway. There are many possible solutions for these scenarios. I propose one that is really easy to use and is considered a standard.

Incorporating can-ndjson-stream

The idea is very simple, use a terminator character to indicate the cut of the message. This standard is called Newline delimited JSON, where the separator is ‘\ n’.

Let’s modify our code in the gateway

In other words, by simply adjusting our transform function, we conform to the standard. If we hadn’t been using transformations we would have to adjust in all the writes.

And on the frontend..

The reading stream is delivered to the library so that it can manipulate it and make the cut by the separator.

Final score

Very good!! 😎

Memory usage

Let’s analyze a bit what happens as we process and send the messages.

Without streams and large volume of data

With streams and large volume of data

I do not need to say any more. Without a doubt streams does a very good handling of our buffer and therefore our memory usage. This clearly happens because without streams the entire packet to be sent is allocated in memory. With streams these are sent, cleaning our buffer and therefore, freeing our memory.

Watch out!! Many times it happens that we try to write to our streams much faster than we can read or send. This is called backpressuring. We are going to notice it with a higher memory usage than normal.

Solution: Modify the configuration of our streams, particularly the highWaterMark until finding an optimal configuration.

Live test

Conclusion

Without a doubt streams works and very well. Anyway, it is clear that it is not necessary in all scenarios and could even give us more problems than solutions. Therefore, my advice is to analyze our scenario and if it is simple, that is, with little information flow or without too many demands on response times, then do not abuse because you will not obtain significant profits. Otherwise, go ahead with streams 🤠🤠🤠

Thank you for reading!!

--

--

Fernando Natalino
Hexacta Engineering

Senior Developer at Hexacta. Always focused on simple, creative and elegant solutions.