Node + streams + objetos ¿Es posible?

Fernando Natalino
7 min readSep 1, 2020

--

En muchas ocasiones, donde tenemos poco flujo de información, con un simple request para obtenerla es suficiente. Pero cuando contamos con grandes volúmenes de datos y quizás con varios orígenes, probablemente sea una buena alternativa el uso de streams ya que nos brinda ciertos beneficios para lograr una aplicación eficiente y con buenos tiempos de respuesta. Pero,¿qué pasa cuando además queremos manipular objetos dentro de nuestro stream? Veamos que sucede…

Pongamos un poco de contexto

Imaginemos que contamos con un frontend el cual visualiza información que es enviada por un backend. Es decir, una arquitectura bastante simple pero pensemos además que este backend recolecta la información de varios orígenes, la parsea, y la da servida a nuestro frontend. A este backend me gusta más llamarlo gateway. Estos orígenes tienen distintos tiempos de respuestas, algunos responden muy rápido y otros bastante lento.

Tenemos dos posibles problemas.

  1. La respuesta de nuestro gateway puede ser bastante grande y pesada.
  2. El tiempo de respuesta de nuestro gateway se verá afectado por el origen que más demore.

Hay varias soluciones para esto. Probablemente podamos usar técnicas de paginado, cambiar la arquitectura, entre otras. En este caso usaremos streams y comprobaremos que es realmente eficiente y efectivo.

Pero qué es Streams?

Básicamente podemos decir que es un método de transmisión de datos por medio de secuencias. Con este método podemos tanto leer como escribir estas secuencias, y lo más interesante es que a medida que lo vamos haciendo, estas siguen su curso. Esto último es fundamental y ya nos da una idea de para que nos puede servir.

Intentemos aplicar este concepto al escenario anterior. Teníamos dos problemas, podemos notar que uno era el tiempo de respuesta, el cual era afectado por el tiempo de respuesta del origen más lento. Este punto con streams se soluciona fácilmente, ya que como mencionamos, mientras vamos recibiendo datos ya podemos ir enviándolos. Esto hace que apenas responda el primer origen ya puedo ir enviando al frontend dicha información y de esa forma visualizarla de manera instantánea. A medida que van respondiendo los otros orígenes, como el flujo todavía no fue cerrado, simplemente voy escribiendo nuestra secuencia y se irán enviando al frontend. Suena bien!

Un poco de código

La idea es retornar información de varios orígenes, pero con distintos tiempos respuesta. Con este simple ejemplo ya podemos ver algunos puntos

Ahora bien, nos faltaría consumir este endpoint recién creado por medio de nuestro frontend. Uno podría pensar en simplemente hacer un request a dicho endpoint y ya…

fetch('http://localhost:1002/streams')

Pero lamento decir que no es tan fácil. Con esta forma estaríamos esperando a que el fetch termine de cerrar el buffer para recién ver la respuesta y no es lo que queremos, pues estaríamos esperando 8 segundos, que es el origen que más demora. Es decir, lo enviamos utilizando streams desde nuestro gateway, pero lo consumimos de forma diferente. Por lo que agreguemos streams a nuestro frontend también!!

Lo que hicimos fue utilizar el body del fetch, el cual es un streams que ya nos proporciona la librería fetch, e ir leyendo lo que va recibiendo. Bien, lo probamos y resulta que funciona!

¿Qué pasa con el buffer y los objetos?

Hasta acá ya enviamos y recibimos streams lo cual no es complicado, pero todavía no vimos nada con respecto al buffer y manipulación de objetos. Sería bueno hacerlo.

Si observamos las respuestas que va imprimiendo nuestro frontend, notamos que no necesariamente imprime 5 veces, que es lo que esperaríamos pues nuestro backend envió datos desde 5 orígenes. Esto depende de la cantidad de datos que tengan nuestros orígenes, si es mucha y supera el buffer entonces ese bloque de datos se parte en N bloques más chicos según el tamaño de nuestro buffer. En principio no parece un problema, mas que nada cuando trabajamos con texto, pero en nuestro caso queremos trabajar con objetos y que se divida un objeto a la mitad no suena bien, por lo que tendríamos que hacer algo al respecto.

Objetos en streams

Configuremos nuestro buffer del stream para que entienda de objetos. Para esto tenemos dos propiedades:

  1. ObjectMode. Setea el streams para que entienda de objetos.
  2. highWaterMark: 1. Setea el buffer para que solo pueda mantener un objeto en el buffer, no más.

Ajustemos nuestro gateway

Lo probamos y resulta que no le gusta 🤔

TypeError [ERR_INVALID_ARG_TYPE]: The first argument must be one of type string or Buffer. Received type object.

Rápidamente lo que podríamos pensar es que nuestro stream de lectura recientemente creado no quedó bien configurado. Pero OJO!! Acá lo que está fallando es el stream de salida, es decir, nuestro response.

Al hacer el pipe y direccionar el stream de lectura en modo objetos al streams duplex (response) tenemos una falla de tipos. Como recordaremos, este duplex es proporcionado por express y no está en modo objetos y tampoco es posible redefinirlo.

Pero, ¿Por qué sucede esto?

Originalmente los streams en node fueron diseñados para hacer que los procesamientos de E/S sean más eficientes. Estos streams eran cadenas de caracteres o buffers y al core de node con eso le alcanza. Es decir, internamente no es necesaria la representación de objetos, pero bien sabemos que para nuestro negocio y entendimiento, es conveniente usarlo. Por esta razón es que podemos manejar objetos en los streams, pero el core de node necesita que al final de nuestro flujo de información, hagamos una pequeña transformación para que pueda manejarlo internamente.

Transformando nuestros streams

Tenemos varias formas de hacer esto. Todas ellas mediante un nuevo tipo de stream, el transform. La otra posibilidad es mediante alguna librería que cree ese stream transform por nosotros, lo configure y simplemente nos pida la función de transformación. Muy simple! vamos por esa. En mi caso use through2, pero existen varias.

Viendo ese doble direccionamiento en pipes, ya se imaginarán la potencia de las transformaciones y el uso de pipes anidados.

¿Era realmente necesario usar un stream de transformación en nuestro ejemplo?

Claramente no porque el ejemplo es muy simple. Pero de todos modos resulta muy práctico pues convertimos nuestros objetos a texto en un solo lado, sino lo tendríamos que hacer en cada escritura de nuestro stream de lectura, como teníamos cuando arrancamos con el ejemplo.

Probando probando…

Al probar todo nuestro código desde el frontend ya vemos el horizonte. Pero falta un detalle que puede ser un gran inconveniente si no lo tenemos en claro. Cuando los datos enviados son realmente grandes, el frontend no está mostrando 5 mensajes, sino muchos más. 😵

Y no solo muchos mensajes, sino que además no se entiende. Hagamos un decode del mensaje y veamos de que se trata. 🤓

Los mensajes son cortados!! Claro, en nuestro frontend tenemos lo siguiente

await fetchedResource.body.getReader();

Es decir, un stream de lectura proporcionado por la librería fetch que claramente maneja un buffer mucho más chico que el que nosotros seteamos en el gateway. Hay muchas soluciones posibles para estos escenarios. Les propongo una que realmente es fácil de usar y se considera un estándar.

Incorporando can-ndjson-stream

La idea es muy simple, usar un caracter terminador para indicar el corte del mensaje. Este estándar se llama Newline delimited JSON, donde el separador es ‘\n’.

Acomodemos nuestro código en el gateway

Es decir, simplemente retocando nuestra función de transformación nos ajustamos al estándar. Si no hubiésemos usado transformaciones tendríamos que retocar en todas las escrituras.

Y en el frontend..

Es decir, el stream de lectura se lo entregamos a la librería para que lo manipule y haga el corte por el separador.

Resultado final

Muy bien!! 😎

Uso de memoria

Analicemos un poco que pasa a medida que vamos procesando y enviando los mensajes.

Sin uso de streams y gran volumen de datos

Con streams y gran volumen de datos

Creo que no hace falta decir mucho más. Sin dudas streams hace un muy buen manejo de nuestro buffer y por ende nuestro uso de memoria. Esto claramente sucede porque sin streams se aloca en memoria todo el paquete a enviar. Con streams se van enviando, limpiando nuestro buffer y por ende, liberando nuestra memoria.

OJO!! Muchas veces pasa que intentamos escribir en nuestros streams mucho más rápido de lo que podemos leer o enviar. Esto se llama backpressuring. Lo vamos a notar con un mayor uso de memoria de lo normal.

Solución: Modificar la configuración de nuestros streams, particularmente el highWaterMark hasta encontrar una configuración óptima.

Prueba en vivo

Conclusión

Sin dudas streams funciona y muy bien. De todos modos está claro que no es necesario en todos los escenarios y hasta podría darnos más problemas que soluciones. Por lo tanto, mi consejo es analizar nuestro escenario y si es simple, es decir con poco flujo de información o sin demasiadas exigencias de tiempos de respuesta, entonces no abusen porque no van a obtener ganancias significativas. En otro caso, streams a fondooo 🤠🤠🤠

Gracias por leer!!

--

--

Fernando Natalino

Senior Developer at Hexacta. Always focused on simple, creative and elegant solutions.