Entendendo os principais conceitos em Streams no NodeJS

Gleydson Jose
Mercafacil
Published in
10 min readJul 6, 2023

Problemas com perfomance em aplicações usando NodeJS podem acontecer em contextos mais complexos com certa frequência, ainda mais se a aplicação trabalhar com leitura de grandes arquivos ou realizar queries em banco de dados para solicitar enormes quantidades de dados.

Felizmente há bons caminhos para seguir e resolver esses tipos de problemas com o próprio NodeJS utilizando suas funcionalidades mais avançadas, que além de ter opções como as Worker Threads e Cluster para ajudar na perfomance no uso de processador, existe também Streams e Web Streams para ajudar no uso de memória.

Contudo, aqui nesse artigo eu irei falar mais sobre Streams e um pouco de Web Streams, sobre como essas funcionalidades do NodeJS podem ajudar com problemas de performance em sua aplicação.

Desde o começo das streams até o dia em que escrevo esse artigo, onde o NodeJS se encontra na versão 20 LTS, as streams já receberam várias melhorias que evoluiram a usabilidade da ferramenta. Essas melhorias são super bem vindas, porque além de refinar e otimizar o uso dessa ferramenta, isso mostra que essa funcionalidade é importante de manter e ir melhorando aos poucos.

No entanto, apesar de ser uma ótima funcionalidade, as streams também podem se tornar mais difíceis de entender em fluxos maiores, porque ela pode ser bastante customizada em cada etapa desse fluxo. Tudo em programação tem trade-offs e com streams não é diferente, mas caso você entenda os principais fundamentos das streams, o uso delas se torna muito mais fácil.

Sabemos até aqui que as streams são bem úteis para solucionar problemas com grandes armazenamentos de dados em memória, mas como as streams conseguem resolver esses tipos de problemas? Vamos supor que você trabalha com uma aplicação web de um supermercado que é bastante frequentado diariamente, onde existem opções de exportar dados de compras dos consumidores.

Com isso, um funcionário do supermercado solicita uma exportação de todos os dados de vendas dos últimos 2 anos pela aplicação, o que será que vai acontecer logo em seguida? Bom, caso sua aplicação não use streams ou uma técnica eficiente de armazenamento de dados em memória, ela irá guardar todos esses dados na memória e dependendo da quantidade de informações por venda na lista de vendas, sua aplicação pode cair, porque o NodeJS tem um limite padrão de consumo de memória, e caso esse limite seja quebrado, vai ser lançado um erro e sua aplicação irá ser derrubada.

Claro que existem várias maneiras de contornar esse tipo de problema, uma delas seria aumentar o limite do NodeJS, mas isso pode aumentar seus custos com recursos no serviço Cloud contratado por permitir sua aplicação usar mais memória. Além disso, uma outra solução poderia ser enviar um email para o funcionário/solicitante da exportação com o arquivo exportado enviado como anexo. Assim tornando possível fazer sua aplicação ser assincrona, dando a chance do script que envia o email trabalhar com ferramentas de filas/queues e processar o envio em paralelo com o outro script que recebe a request do client e solicita o envio de email.

Caso você implemente streams na solução que você escolheu, tudo fica ainda mais leve, porque você consegue manipular todos esses dados de vendas retornandos eles por chunks/pedaços e ir escrevendo no arquivo aos poucos sem manter todos os dados em memória ao mesmo tempo, e sim somente o pedaço recebido na iteração do processo. Mais adiante eu irei dar exemplos de escrita e leitura de arquivos e outros detalhes com streams.

Tipos de stream

Contudo, como as streams realmente funcionam? As streams são normalmente separadas por 3 etapas em todo fluxo, algo bem semelhante com a ETL(Extract, Transform and Load) usadas para processamento de dados, porque as streams são separadas por Readable(Leitura), Transform(Transformação) e Writable(Escrita). Apesar de existir a stream Transform, não é obrigatório usar ela sempre, apenas com a Readable e Writable você já consegue ter um fluxo completo. Com a stream de leitura você consegue extrair os dados de uma fonte de dados e mover eles para uma saída, onde essa saída pode ser uma stream de escrita que irá receber os dados e escrever em um arquivo ou em qualquer outro lugar que você escolher, mas caso você precise modificar esse dados no meio do fluxo, a stream de transformação será necessária.

Além desses 3 tipos de streams, ainda existe um 4° tipo que é uma stream de leitura e gravação ao mesmo tempo, mas ela não é tão amplamente utilizada por ser útil apenas em alguns casos bem específicos, essa stream é chamada de Duplex.

Pipeline

Até aqui aprendemos que as streams tem seu fluxo separado por etapas, mas como essas etapas são interligadas para montar o fluxo todo? Se você é um usuário de distros Linux, você já deve ter visto a funcionalidade de pipes no terminal, onde são combinados vários comandos e o output de um é transferido para o input do outro:

cmd1 | cmd2 | cmd3 | cmd4

Com as streams as pipes funcionam de uma maneira bem semelhante, onde podemos usar o método .pipe() de uma stream ou a função pipeline do pacote node:streams:

.pipe():

stream1
.pipe(stream2)
.pipe(stream3)
.pipe(stream4)

pipeline():

pipeline(
stream1,
stream2,
stream3,
stream4,
(error) => {
if (error) {
console.error(error);
} else {
console.log('Pipeline concluída com sucesso.');
}
}
)

O legal da função pipeline é a possibilidade de tratar erros no fluxo de streams usando callback.

Ainda sobre o uso de pipes com cada tipo de stream, existe alguns pontos importantes de saber antes de conectar uma stream na outra, por exemplo:

- Uma stream de leitura pode ser conectada com uma stream de escrita, transformação ou duplex, mas a duplex precisa ter a função de escrita definida.

- Uma stream de transformação pode ser conectada com uma stream de escrita ou duplex, mas a duplex precisa aqui também ter a função de escrita definida.

- Uma stream de escrita é normalmente considerado a etapa final de um fluxo.

Além disso, os dados que são trafegados entre as streams normalmente estão como Buffers, que seria uma sequência de bytes que são armazenados na memória e manipulados de uma forma mais performática por trabalhar em mais baixo nível, normalmente cada buffer segue um tamanho padrão em bytes, e caso você exceda esse limite, um novo buffer é gerado internamente. Um ponto legal é que você consegue manipular o tamanho dos buffers internos modificando o highWaterMark de uma stream, e claro, esse atributo de configuração vem com um valor padrão nas streams. Para entender onde normalmente é usado esse atributo, veja esse exemplo:

// Stream de escrita para um arquivo
createWriteStream('./items.txt', { highWaterMark: 1024 })

// Stream de escrita com a classe padrão
new Writable({
highWaterMark: 1024,
write(chunk, enconding, callback) {
console.log('Escrevendo de uma stream de escrita', chunk.toString())
callback()
}
})

Usei apenas stream de escrita no exemplo, mas outros tipos de stream também tem esse atributo.

Além do tipo Buffer em chunks de dados, também é possível trabalhar com objetos, mas nessa opção a perfomance é menor. E um detalhe importante é o uso do highWaterMark nesse modo, porque o uso desse atributo com esse modo vai definir o número de objetos trafegados por pedaços.

Veja abaixo um exemplo com alguns atributos existentes para configurar e usar objetos:

// Alguns desses atributos também existem em outros tipos de streams
new Transform({
readableObjectMode: true,
writableObjectMode: true,
objectMode: true,
transform(chunk, encoding, callback) {
console.log(chunk.toString())
}
}),

Existe também alguns pacotes que trabalham com streams retornando chunks como objeto, um bom exemplo disso é o pacote mongodb. Caso você use esse tipo de pacote e precise modificar os dados com uma stream Transform, é recomendável configurar a stream para aceitar esse formato:

const indexes = [1, 2, 3]
const readableStream = await db.collection('items').find({
index: { $in: indexes }
}).stream()
readableStream.pipe(
new Transform({
readableObjectMode: true,
writableObjectMode: true,
transform(chunk, encoding, callback) {
console.log(chunk.toString())
}
}),
)

Eventos

No NodeJS sabemos que o uso de eventos é super comum, porque com eventos podemos executar trechos de códigos após o evento ser acionado. Felizmente, as streams não estão de fora disso, porque além do fluxo de pipes que as streams tem, também existe um outro que possibilita usar eventos em cada tipo de stream e fazer alguma coisa caso alguns deles sejam acionados. Atualmente as streams de leitura e gravação tem esses eventos abaixo:

Readable: data, end, error, close, readable, resume e pause

Writable: drain, finish, error, close, pipe e unpipe

Eu não irei colocar os eventos das streams Transform e Duplex, porque elas são bem semelhantes a Readable.

Podemos usar o evento data para receber os dados que estão chegando na stream, e o evento end para fazer qualquer coisa quando a stream finalizar. Os eventos pause e resume são acionados quando métodos com esses mesmos nomes são utilizados na stream de leitura, e para entender melhor onde eles são úteis, vamos usar o seguinte exemplo, imagine que temos um fluxo dentro do evento data onde em algum momento o fluxo tem que ser pausado até finalizar uma request, para isso podemos parar o fluxo com o .pause e ativar novamente usando o .resume após a request ser finalizada. Veja um pequeno exemplo disso usando resume e pause logo abaixo:

readable.on('data', async (chunk) => {
readable.pause()
const response = await fetch(`http://qualquerurl/user/${chunk.toString()}`)
const data = await response.json()
readable.resume()
console.log(data)
})

readable.on('pause', () => {
console.log('Pausando stream')
})

readable.on('resume', () => {
console.log('Voltando a usar a stream')
})

Se caso a stream não for pausada enquanto alguma request demora para finalizar, o resultado final do seu código pode ficar bagunçado por conta das Queues do NodeJS.

Exemplos mais completos usando as streams

Para criar qualquer tipo de stream podemos importar elas do pacote node:stream e criar cada uma assim:

import { Readable, Transform, Writable } from 'node:stream'

const readable = new Readable({
read() {
this.push('Usuário 1')
this.push('Usuário 2')
this.push('Usuário 3')
this.push(null)
}
})
const transform = new Transform({
transform(chunk, enconding, callback) {
const user = chunk.toString().concat(' foi aprovado')

// aqui pode ser assim:
this.push(user)
callback()

// ou assim:
// callback(null, user)
}
})
const writable = new Writable({
write(chunk, enconding, callback) {
console.log(chunk.toString())
callback()
}
})

readable.pipe(transform).pipe(writable)

Para ler e escrever arquivos podemos usar o createReadStream e o createWriteStream respectivamente:

import { pipeline, Transform } from 'node:stream'
import { createReadStream, createWriteStream } from 'node:fs'

const readableStream = createReadStream('./usuarios.txt')
const transformStream = new Transform({
transform(chunk, encoding, callback) {
const newChunk = chunk.concat(' foi aprovado')
this.push(newChunk)
callback()
}
})
const writableStream = createWriteStream('./usuarios-aprovados.txt')

pipeline(
readableStream,
transformStream,
writableStream,
(error) => {
if (error) {
console.error('Ocorreu um erro na pipeline:', error);
} else {
console.log('Pipeline concluída com sucesso.');
}
}
)

Você sabia que a request e response HTTP também são streams no NodeJS? Cada uma delas podem ter tipos diferentes de stream caso seja o lado do client ou servidor. Portanto, caso seja um cliente, a request que ele faz para o servidor é uma stream de escrita e o response que ele recebe é uma de leitura. No entanto, no lado do servidor a request que ele recebe do cliente é de leitura e a response que ele envia é de escrita.

Veja esse exemplo com as streams no lado do servidor com MongoDB:

import { MongoClient } from 'mongodb'
import { createServer } from 'http'

const uri = 'mongodb://root:root@localhost:27017'
const client = new MongoClient(uri)
const port = 3000
let db

try {
await client.connect()
console.log('Conectado');
db = client.db('test')
} catch (error) {
console.log('Error', error)
}

createServer(async (req, res) => {
let usersEmails = []
for await (let chunk of req) {
usersEmails = usersEmails.concat(JSON.parse(chunk.toString()))
}
const usersReadableStream = await db.collection('users').find({
email: { $in: usersEmails }
}).stream()
usersReadableStream
.map(chunk => `${chunk.name}\n`)
.pipe(res)
}).listen(port).on('listening', () => console.log('Servidor iniciado!'))

Acredito que você tenha percebido que usei o map na pipeline de streams e o for await na request, mas o que esse map e o for await conseguem fazer?

map

O map nas streams é uma feature mais recente que foi lançada junto ao filter e reduce no Node 17 e logo depois no 18 LTS, esses métodos são semelhantes com métodos com nomes iguais no prototype Array.

for await

Lembra do evento data? Com o for await conseguimos ler chunk a chunk de uma stream de leitura igualmente, mas o for await consegue ser útil também em outros cenários além desse, por exemplo, é possível usar ele com um array de promises.

Após um exemplo do lado do servidor, vamos agora para um exemplo usando streams no lado do cliente:

const data = ['usuario2@exemplo.com', 'usuario3@exemplo.com']
const response = await fetch('http://localhost:3000', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(data)
})
const readableStream = response.body
readableStream
.pipeThrough(new TextDecoderStream('utf-8'))
.pipeThrough(
new TransformStream({
transform(chunk, controller) {
const names = chunk.trim().split('\n')
for (let name of names) {
controller.enqueue(name)
}
}
})
)
.pipeThrough(
new TransformStream({
transform(chunk, controller) {
controller.enqueue(`Nome: ${chunk}`)
},
flush(controller) {
controller.enqueue('Nome: Usuário Teste')
}
})
)
.pipeTo(
new WritableStream({
write(chunk, controller) {
console.log(chunk)
}
})
)

Esse exemplo funciona no browser graças as Web Streams, eu não usei Streams porque elas funcionam apenas no NodeJS, mas as Web Streams são bem semelhantes as Streams do NodeJS, porque seguem os mesmos conceitos.

Felizmente, as Web Streams além de existir no browser, também existe no NodeJS, mas será que faz sentido usar as Web Streams no lugar de Streams do NodeJS? A resposta para essa dúvida pode depender de como você vai usar as streams, por exemplo:

- Caso você vá usar streams em um código que precise funcionar no NodeJS e Browser, faz sentido usar as Web Streams para funcionar nos dois lados.

- Caso você use um pacote que trabalhe apenas com Web Streams, faz sentido usar as Web Streams também no NodeJS.

No entanto, caso você se encaixe nesses dois casos acima e mesmo assim não quer usar Web Streams, é possível converter Web Streams para Streams no NodeJS, o inverso também é possível. Para conseguir fazer isso é necessário usar os métodos estáticos toWeb e fromWeb das classes Readable, Transform e Writable das Streams com as ReadableStream, TransformStream e WritableStream respectivamente.

As Web Streams são bem semelhantes com as streams, mas você pode ter visto algumas coisas diferentes no exemplo anterior e ter ficado com algumas dúvidas. No caso, eu estou falando da pipeTo, pipeThrough e o TextDecoderStream, vamos entender onde cada um é útil:

pipeTo

O pipeTo é útil para conectar o fluxo de uma stream de origem(leitura ou transformação) com uma stream de destino(escrita)

pipeThrough

O pipeThrough é usado para conectar o fluxo de uma stream de leitura entre múltiplas streams de transformação.

TextDecoderStream

Essa classe é útil para decodificar uma stream de bytes para uma stream de strings/textos.

Para finalizar, as Web Streams não tem Duplex como nas Streams do NodeJS.

Muito obrigado por acompanhar o artigo até aqui, espero que você tenha gostado e também que eu tenha conseguido te ajudar a entender pelo menos um pouco de como funciona as streams.

--

--