Arquitecturas Concurrentes, Episodio 6: Observables

En el capítulo sobre promises vimos que cosificar el resultado de una computación puede ser una idea interesante por varios motivos:

  • Evitamos otorgar el control del flujo de nuestro programa en forma de continuación.
  • La interpretación de la promise como contenedor nos permite razonar y operar en términos de funciones comunes sobre valores (map, flatMap, etc.).
  • El API resultante nos permite manejar declarativamente cómputos asincrónicos y con excepciones.

Con todo esto, cabe preguntarse si cualquier uso de CPS puede promisificarse. Fácil, ¿no? Es sólo cuestión de agarrar algo así:

… y envolverlo de esta forma:

En principio parecería que esta API es suficiente para codificar cualquier cómputo que tenga dos flujos posibles de resultado (éxito o error). ¿Se nos estará escapando algo?

Multiplicidad

Cuando todo esto comenzó mostramos cómo representar utilizando CPS una computación no determinística. Por esto nos referimos a un proceso que, dada una entrada, puede no tener un único resultado válido. Si bien nuestro hardware ejecuta de forma estrictamente determinística, podemos simular estos procesos del siguiente modo:

Si pasamos el código anterior por nuestra máquina de promisificación obtenemos:

¡Pan comido! Desgraciadamente este código no hace lo que queremos... al ejecutarlo veremos que sólo se imprime en pantalla el nombre del primer resultado. El motivo es que, como indica la mismísima especificación A+, el valor de la promise no cambia una vez resuelta.

¿Mala decisión de diseño? No, las promises son una abstracción súper útil que captura cierta clase de cómputos que podemos hacer con CPS. Sin embargo, si lo que queremos es una abstracción que capture computaciones con más de un resultado vamos a tener que buscar por otro lado.

Observables sabor Rx

Las Reactive Extensions son una herramienta que surgió hace pocos años y casualmente nos permiten aprovechar las ventajas de utilizar promises pero sobre fuentes de datos más generales. Originalmente surgieron para .NET, pero hoy cuenta con implementaciones en varios lenguajes. En este artículo mostramos RxJS, la utilizada en Javascript.

La abstracción principal de la biblioteca es el observable, que puede interpretarse como un cómputo asincrónico que emite posiblemente múltiples datos. ¡Parece justo lo que necesitamos! Veamos cómo se puede arreglar nuestro intento anterior para obtener efectivamente un resultado multivaluado:

Bien, punto para los observables. Descompongamos un poco el ejemplo anterior:

  • Utilizamos rx.Observable.create para crear un observable.
  • En cada nueva subscripción se ejecuta la función que le pasamos a create para emitir los valores del cómputo.
  • Utilizamos el método subscribe para hacer algo cuando el observable genera algún elemento.

Además de esto, vemos que mediante onComplete indicamos a un observer que ya no hay más resultados. En su encarnación más completa, la interfaz de subscribe nos permite enterarnos de esto y también manejar errores de forma similar a como hacíamos con promises:

Hasta acá bien… tenemos que los observables son una especie de promise que se puede resolver muchas veces. ¿Tanto lío solo por esto? Si y no.

Recordarán tal vez que las promises podían interpretarse no sólo como cómputos asincrónicos sino también como contenedores de valores. A esta altura el lector atento podrá preguntarse si el mismo razonamiento puede aplicarse sobre los observables, y qué ventajas nos traerá.

Colecciones “push”

En el episodio anterior dijimos que las promises se podían pensar como cajas con (posiblemente) un elemento adentro. No es sorprendente que, en la misma línea, podamos pensar a los observables como cajas que pueden contener cero o más valores. Además, dado que los valores no se emiten en simultáneo, podemos decir también que están ordenados (más allá de si en algún caso en especial no nos importa el orden en que fueron generados). Cuando sea conveniente, entonces, nos vamos a permitir pensar a los observables como algo parecido a una lista.

No somos nosotros los primeros notar esta correspondencia. Los creadores de RxJS nos hicieron bastante fácil crear un observable a partir de una lista:

Acá vemos que el observable sería una contraparte push de la lista, en el sentido de que procesamos los elementos a medida que la fuente de datos nos notifica.

¿De qué nos sirve pensar a los observables como colecciones? Resulta que podemos programar de forma declarativa usando APIs de alto orden estándar, como se ve a continuación:

Tal vez ya estén acostumbrados a trabajar con este tipo de transformaciones sobre listas y no se sorprendan mucho. Lo que quizás no sea evidente es que estamos trabajando sobre un stream de datos generado asincrónicamente. En el fondo nuestro observable podría estar encargándose de reintentar pedidos fallidos al API, cancelar algunos por timeout, servir datos de una caché cuando es posible, o incluso emitir elementos de una lista en memoria como vimos antes.

La clave es que esto no nos importa: a la hora de manipular los datos nos preocupamos por transformaciones de valores llanos, y toda la maquinaria de atrás se encarga de manejar el asincronismo.

Marble diagrams

Para razonar sobre las transformaciones de observables suele ser útil visualizar cada uno de los streams intermedios en diagramas como el siguiente:

products  ---p1-----p2----p3--------p4---p5----X
| | | |
filter ---p1-----------p3--------p4---------X
| | | |
map ---100---------230--------40---------X
|
sum ------------------------------------370--X
La operación sum que se ve en el ejemplo va acumulando los valores generados por su observable “fuente” y emite un único valor cuando este último termina, asumiendo que lo hace. Observar, sin embargo, que en el caso general nada nos impide trabajar con streams infinitos.

Es común encontrar diagramas como este (ok, quizás un poco más bonitos) en la documentación de las operaciones sobre observables. El API de RxJS es gigante, y estas visualizaciones suelen ser bastante útiles para entender ciertas sutilezas en transformaciones de uso más específico que las ya conocidas map y filter.

Abriendo cajas

En el ejemplo anterior vimos que los observables también implementan el API de map. Tal como para el caso de listas y promises, esta operación sirve para operar directamente sobre valores abstrayéndonos de ciertas cualidades del objeto con el que estamos trabajando.

En el caso de las listas, map nos abstrae de la multiplicidad de elementos, mientras que en el caso de promises lo que ocultamos es el asincronismo y la posible presencia de errores. Al utilizarlo sobre observables estamos haciendo la vista gorda a todos estos factores a la vez.

Supongamos, retomando el ejemplo anterior, que contamos con otra API que nos provee el listado de preguntas asociadas a cada producto:

¿Cómo obtenemos en base a estos elementos un observable de todas las preguntas? En este caso map no nos va a ayudar:

products                   ---p1--------------p2----------------X
| | |
products.map(questions) ---questions(p1)---questions(p2)-----X

Aquí estamos emitiendo un observable de preguntas por cada producto, cuando en realidad deseamos emitir cada pregunta individualmente.

Momento. En cierto sentido ya sabemos resolver este mismo problema para listas y promises. Resulta que los observables implementan también la operación flatMap, que nos viene como anillo al dedo para casos como este:

products            ---p1----p2---X
| |
questions(p1) `-----|--q-------q---X
| | |
questions(p2) `--|--q----|----q----q---X
| | | | |
| | | | |
products.flatMap(questions) q--q----q----q----q---X

Como habrán visto, esta idea de aplicar una operación y aplanar el resultado puede parecer rebuscada en principio pero termina emergiendo en distintos contextos. Más adelante en la materia vamos a arrojar un poco de luz sobre qué caracteriza estos casos y cómo extrapolar esta idea a otros tipos de “cajas”.

Observables como señales continuas

Hasta ahora venimos esquivando una pregunta que parece trivial: ¿cuándo comienza un observable a emitir elementos? Miremos el siguiente ejemplo:

Al ejecutarlo, veremos que cada elemento se imprime por pantalla dos veces. Esto parece natural si pensamos a los observables como una extensión del concepto de promise: uno nunca “se pierde” el valor de una promise por más de que el llamado a then ocurra después de su resolución. Se dice que un observable es frío si garantiza que cada suscriptor vea la secuencia completa de eventos, como en este caso.

Todo parecería indicar que deben entonces existir también los observables calientes. ¿Por qué querría un observador razonable perderse valores? Antes mencionamos que nada nos impide trabajar con fuentes de datos lógicamente infinitas. Con esto en mente, supongamos que queremos modelar un flujo de eventos continuo como el que vemos a continuación:

Aquí el foco no está en tratar al conjunto de mediciones en su totalidad si no en ir procesando en tiempo real los eventos a medida que se producen. En casos como este tiene sentido manejar observables que emiten datos independientemente de si alguien está escuchando, y que los observadores se enteren sólo de los valores emitidos desde el momento que se suscriben.

¿Ya sabemos reactive?

¡Un segundo! La palabra reactive está bastante sobrecargada en la industria, así que antes de agregarla al CV nos vemos obligados a hacerle un poco de justicia.

Originalmente, el término se utilizó de manera informal para describir programas que reaccionan continuamente a impulsos de su entorno, y cuyo ritmo de trabajo está determinado por la velocidad a la que se producen estos eventos.

Luego vino la definición formal de Conal Elliot (1997), que se basa en esa idea pero hace foco en una forma muy específica de representar el estado de la señal en función del tiempo que casi nadie parece respetar.

Tiempo después, en 2013, un grupo de notables publica el Reactive Manifesto, en el cual marcan el camino de cómo se deben construir sistemas escalables para lidiar con los requerimientos del siglo 21. Tiene ideas interesantes, aunque su uso del término tiene muy pocos puntos de contacto con los que venimos mencionando.

Dejando de lado este último accidente, la mayoría de las herramientas actuales que se cobijan bajo la palabra reactive responden de algún modo a esa noción de programa como máquina de reaccionar a eventos continuos producidos por su entorno, en general aplicando transformaciones funcionales sobre flujos o señales de datos. Ejemplo de estos on Bacon, Elm, y las mismas Reactive Extensions que presentamos hoy.

Material adicional y ejercicios