Como trabalhar com grande volumes de dados sob demanda em NodeJs

Nathan Vieira Marcelino
Lett
Published in
7 min readOct 25, 2021
Photo by Blake Connally on Unsplash

Na Lett utilizamos, em muitas plataformas, o NodeJS como principal ferramenta de back-end, possibilitando com essa tecnologia, muitas conexões simultâneas, atendendo clientes de toda a América Latina. Trabalhamos com dados de mais de 400 varejos, monitorando imagens de produtos, preços, ranking dos produtos por busca, etc.

Com essa breve introdução, dá pra ter uma ideia do tanto de dados que processamos diariamente. Entretanto, existem análises que são dinâmicas, que precisamos fazer de acordo com filtros ou especificações, mas qual a maneira de fazer isso com um volume imenso de dados com NodeJS, um dos runtime javascript mais popular atualmente ?

Características do Node

A característica que diferencia o Node.js de outras plataformas é exatamente a execução single-thread, em que uma única thread executa o código Javascript. Em outras linguagens, ela é multi-thread, ou seja, a cada solicitação (input) são requisitados recursos computacionais até a obtenção de uma resposta (output).

A quantidade de threads é limitada ao volume desses recursos computacionais disponíveis. Então, elas não podem ser criadas em grande volume, pois quando esse limite é atingido, as novas solicitações precisam aguardar para ser executadas.

Um sistema com capacidade de 8 GB de memória RAM, em que cada thread tem 2 MB de memória, está limitado à quantidade máxima de 4.000 conexões simultâneas. Isso, sem computarmos os gastos de recursos com a alternância dos encadeamentos, por exemplo.

Com o Node.js, esse cenário não existe: ele possibilita executar milhões de conexões simultâneas, porque não aguarda o processamento da resposta, ou seja, é altamente escalável.

É lógico que os desenvolvedores precisam cuidar para que todas essas solicitações do usuário não sejam bloqueadas até que o cálculo seja concluído. Além disso, devem criar um processo de recuperação em cada instância travada, pois pode haver exceções que alcancem o loop de eventos e travem o programa.

Caso tenha interesse, recomendo muito a apresentação de Philip Roberts sobre o funcionamento do loop de eventos no Node.

Mas que diabos é o loop de eventos? | Philip Roberts

Node JS Streams

“Javascript não foi criado para processar grandes cargas de dados.”

Quem nunca ouviu essa frase ? Mas ela é realmente verdade ?

Pode-se dizer que ela é uma meia verdade. Javascript (e consequentemente o Node) não foi feito para carregar grande quantidade de dados em memória de uma só vez, mas alternativamente podemos processar esses dados em partes (chunks), e é ai que entra o módulo stream do NodeJs .

Node JS Streams são funções pra processar grandes cargas de dados sob demanda transformando-os em Buffers .

Existem essencialmente existem três tipos de streams:

  • Readable stream: fornece os dados para o processamento. Conexões com banco de dados, leitura de arquivos, requisições Web.
  • Transform stream: como o nome sugere, é aqui que a mágica acontece, onde limpamos, formatamos, removemos, calculamos.
  • Writable stream: quem gera o produto final. Escrever um JSON, enviar o dado parcial pela requisição, etc.

Todas as etapas juntas, são chamados de pipelines. Analogicamente pode-se dizer que esses 3 processos assemelham-se a uma linha de produção*.

Existe uma quarta Node Stream chamada Duplex, que nada mais é do que uma Readable e Writable stream juntas, como exemplo as bibliotecas de socket.*Linha de produção: A linha de produção, também conhecida como linha de montagem, é um modelo de fabricação de mercadorias em série. Uma sequência de ações específicas e repetitivas é realizada, de forma contínua e bem divididas.

Na prática:

(use node 16 para seguir os exemplo)

Já usamos Streams mesmo sem saber. O módulo nativo de HTTP possui NodeJs Streams (request é uma readable stream, response é writable), como citado anteriormente o módulo socket é uma duplex stream e do módulo zlib a função createDeflate é uma transform stream.

Mas além dessas funções prontas, podemos criar as nossas próprias pipelines e ETLs (Extract Transform and Load são outro nome para as streams) com Node.

Exemplo 1:

O terminal pode ser uma readable stream e uma writable stream também. Com esse pequeno exemplo, podemos fazer a string ir do input do usuário no terminal para o output (também no terminal hehe)

Esse exemplo podemos comentar a linha 2 e fazer as coisas fluírem de uma stream para outra ou com o .on('data') podemos substituir o comportamento padrão e fazendo modificações na string de saída.

process.stdin.pipe(process.stdout)
.on('data',msg=>console.log('data',msg.toString()))

Exemplo 2:

Como falei anteriormente, os módulos do node já utilizam por baixo dos panos o recurso de streams. O código abaixo por exemplo, mesmo não tendo importado os módulos relacionados a streams funciona de forma parecida. Entretanto o código carrega em um Buffer todo o arquivo antes de escreve-lo na response, e caso algum descuido seja feito e acabemos por transformar esses dados (como no .toString()) podemos travar a máquina simplesmente por carregar em memoria uma string muito grande.

Por isso, carregar em memoria um arquivo muito grande, mesmo que em Buffer, pode ser considerado uma prátia ruim.

import http from 'http'
import {readFileSync} from 'fs'
// node -e "process.stdout.write(crypto.randomBytes(1e9))" > big.file

http.createServer((req,res)=>{
const file = readFileSync('big.file')//.toIString()

res.write(file)
res.end()
}).listen(3000, ()=>{
console.log('<http://localhost:3000>')
})
// curl localhost:3000 --output out.txt

Os trechos comentados são para utilizar de fato o código. Primeiro crie o arquivo big.file com o primeiro comando, rode o código a seguir com node e em seguida faça a requisição http com o curl.

O jeito “menos errado” de implementar o exemplo anterior seria com o próximo código, pois assim cada chunk já vai para a writable stream (nesse caso a response) enquanto o processo de leitura acontece, não sendo necessário carregar em memoria o Buffer todo, para em seguida enviar a resposta. Nesse modelo ainda é possível fazer modificações no chunk no meio do caminho, mas vamos tratar sobre isso nos próximos exemplos.

import http from 'http'
import { createReadStream } from 'fs'
// node -e "process.stdout.write(crypto.randomBytes(1e9))" > big.file

http.createServer((req,res)=>{
createReadStream('big.file').pipe(res)

}).listen(3000, ()=>{
console.log('<http://localhost:3000>')
})
// curl localhost:3000 --output out.txt

Usar a função .pipe() nativa do NodeJS como demonstrado nos exemplos anteriores também não é uma boa prática. Ela é útil como em alguns casos dos exemplos demonstrados quando não há grandes transformações de dados.

Mas no mundo real, onde erros acontecem, arquivos são corrompidos ou mal formatados, a função pipe não se comporta muito bem, pois ela não trata grande parte das exceções fazendo com que seu processo de read continue executando, mesmo que o write tenha parado, causando assim o conhecido memory leak. Portanto, alternativamente a gente usa a pipeline do módulo stream que tem o mesmo funcionamento, mas com algumas funções a mais para lidar com erros.

Exemplo 3:

No próximo exemplo, utilizaremos a função pipeline como promise, não é obrigatório, mas acredito que é mais fácil de trabalhar do que com callbacks. Se quiser dar uma olhada na documentação sobre como utilizar a pipeline com callbacks clique nesse link.

É muito difícil criarmos de fato uma readableStreams no mundo real. Geralmente, nessa parte utilizamos módulos que já fazem esse trabalho para nós, buscando dados de um bando de dados ou um arquivo, mas no exemplo estaremos simulando isso.

Nesse código, a cada push simplesmente iremos imprimir o dado no console, perceba que ao comentar o trecho .toString() veremos um console com Buffer <bytes>, isso acontece pois as streams só trabalham com bytes, nós desenvolvedores precisamos fazer o encoding correto da informação para podermos trabalhar com cada chunk.

import { pipeline, Readable, Writable } from 'stream'
import { promisify } from 'util'

const pipelineAsync = promisify(pipeline)

const readableStream = Readable({
read:function () {
this.push('test 0') //para cada push ele vai para a proxima etapa
this.push('test 1')
this.push('test 2')
this.push('test 3')
this.push(null)
}
})

const writableStream = Writable({
write (data,encoding,callback) {
console.log(data/*.toString()*/)
callback()
}
})

await pipelineAsync(readableStream,writableStream)

Exemplo 4:

Nesse exemplo, criamos uma readable que cria muitos dados mesmo. Quase 100MiB fazendo modificações, transformando em CSV e em seguida escrevendo em um arquivo com uma writableStream.

Na função transformStream, instanciamos a classe Transform criando o método que de fato irá implementar a transformação no chunk, é uma modificação simples que chama a callback ao fim da execução, assim informando para o próximo passo da pipeline o resultado da modificação.

O primeiro parâmetro da callback é sempre erro, como não fizemos tratativa de erro o valor é nullo.

import { pipeline, Readable, Transform, Writable } from 'stream'
import { promisify } from 'util'
import { createWriteStream } from 'fs'
import crypto from 'crypto'

const pipelineAsync = promisify(pipeline)

const readableStream = Readable({
read() {
for (let i = 0; i < 1e5; i++) {
const chunk = {id: Date.now()+i, name: crypto.randomBytes(10).toString('hex') }
this.push(JSON.stringify(chunk))
}
this.push(null)
}
})

const transformStream = Transform({
transform(chunk,encoding,callback) {
const data = JSON.parse(chunk)
const result = `${data.id},${data.name.toUpperCase()}\\n`
// err, sucesso
callback(null,result)
}
})

const setHeader = Transform({
transform(chunk,encoding,cb) {
this.counter = this.counter ?? 0
if(this.counter > 0){
return cb(null,chunk)
}
this.counter+=1
cb(null,"id,name\n".concat(chunk))
}
})

const writableStream = Writable({
write (data,encoding,callback) {
console.log(data.toString())
callback()
}
})

await pipelineAsync(
readableStream,
transformStream,
setHeader,
createWriteStream('out.csv') // pode substituir pela writableStream
)

Exemplo 5:

Vamos juntar as coisas agora, com o exemplo 5 iremos colocar a pipeline para ser executada no servidor http. Esse por sua vez cria um JSON dinamicamente, transforma em CSV e retorna para o usuário na response. Preste atenção como precisamos setar o status code como 206, indicando que essa requisição lida com arquivos em partes, e que o conteúdo na request pode (e irá) crescer a medida que o tempo passa.

import http from 'http'
import { pipeline, Readable, Transform, Writable } from 'stream'
import { promisify } from 'util'
import crypto from 'crypto'

const readableStream = Readable({
read() {
for (let i = 0; i < 1e5; i++) {
const chunk = {id: Date.now()+i, name: crypto.randomBytes(10).toString('hex') }
this.push(JSON.stringify(chunk))
}
this.push(null)
}
})

const transformStream = Transform({
transform(chunk,encoding,callback) {
const data = JSON.parse(chunk)
const result = `${data.id},${data.name.toUpperCase()}\\n`
// err, sucesso
callback(null,result)
}
})

const setHeader = Transform({
transform(chunk,encoding,cb) {
this.counter = this.counter ?? 0
if(this.counter > 0){
return cb(null,chunk)
}
this.counter+=1
cb(null,"id,name\\n".concat(chunk))
}
})

http.createServer(async(req,res)=>{

// res.statusCode = 206

const pipelineAsync = promisify(pipeline)

await pipelineAsync(
readableStream,
transformStream,
setHeader,
res
)

}).listen(3000, ()=>{
console.log('<http://localhost:3000>')
})

Exemplo 6:

Com o exemplo 6, temos a implementação de um endpoint de streaming de video utilizando Node JS Streams. Muito parecido com o exemplo anterior, mas agora de fato utilizando os headers corretos para transmissão de dados sobre demanda do protocolo HTTP.

--

--