Flujos de datos con ‘SharedFlow’

Glenn Sandoval
Kotlin en Android
Published in
12 min readOct 24, 2020

Un “StateFlow con esteroides” como alternativa a los Subjects de RxJava y más

📖 Si recién estás aprendiendo a aplicar flujos de datos con Flow te sugiero leer mis artículos Flow: I — Estructura, dinámica, restricciones y operadores y Flujos de datos con ‘StateFlow’ antes de continuar.

Dado que un Flow es un flujo de datos “en frío” debido a que los elementos son producidos y emitidos bajo demanda cuando un operador terminal es aplicado sobre él, algunas veces tenemos que recurrir a otros métodos o estructuras, como por ejemplo los canales, para poder emitir elementos sin que haya observadores o suscriptores, esto con tal de no perderlos y que puedan ser consumidos tan pronto alguien observe o se suscriba al flujo de datos.

SharedFlow viene a cubrir ese vacío que tanto necesitábamos para que por fin dejemos de dar mil vueltas con implementaciones complejas o estructuras que no nos convencen del todo para que finalmente obtengamos algo que en la práctica debería ser muy simple.

El flujo de datos SharedFlow, como excepción a la regla con respecto a lo que es un Flow en sí, siempre está activo y no necesita de suscriptores para comenzar a emitir y continuar emitiendo elementos. Esto lo convierte en un flujo de datos “caliente”.

Toma en cuenta que un flujo de datos SharedFlow nunca se completa de manera normal y por lo tanto suscribirse a él por medio de operadores terminales que inician una coroutine, como launchIn, o que esperan un elemento final, como toList, nunca finalizarían.

Por otra parte, cualquier suscriptor de un SharedFlow se pude cancelar por medio de la cancelación del ámbito de la coroutine suscrita. No debes preocuparte por hacer que la coroutine suscriptora sea cooperativa con la cancelación ya que cada suscriptor verifica su cancelación por cuenta propia antes de que cada elemento sea emitido.

Como mencioné en el subtítulo de este artículo, un SharedFlow es como un StateFlow pero con esteroides y vamos a ver por qué a continuación.

⚙️ Para hacer uso de SharedFlow es necesario que incluyas en tu proyecto Kotlin la librería org.jetbrains.kotlinx:kotlinx-coroutines-core versión 1.4.0-M1 o superior.

SharedFlow vs StateFlow

Los flujos de datos SharedFlow y StateFlow no son diferentes. De hecho, StateFlow es una implementación especializada de SharedFlow que almacena solamente el elemento más reciente. Si has trabajado con Subjects de RxJava, el comportamiento de un StateFlow se corresponde con un BehaviorSubject.

Antes de iniciar con los ejemplos de este artículo, vamos a valernos de una función que imprima en pantalla los mensajes precedidos por el nombre del hilo que se encuentra en ejecución:

Vale decir que para actualizar los valores de un SharedFlow o un StateFlow es necesario usar sus correspondientes estructuras que permiten modificaciones, éstas son MutableSharedFlow y MutableStateFlow respectivamente. Veamos un ejemplo sencillo aplicando ambas estructuras:

  • Con MutableStateFlow:

🔗 Encuentra el código anterior completo aquí.

Resultado:

  • Con MutableSharedFlow:

🔗 Encuentra el código anterior completo aquí.

Resultado:

Haciendo una comparación entre ambas versiones, lo primero que se puede observar es que a un MutableStateFlow se le debe especificar un valor inicial. Un MutableSharedFlow, por el contrario, no requiere de un valor inicial, sin embargo, para conseguir el mismo comportamiento de un StateFlow es necesario asignarle a su parámetro replay el valor de 1 y a su parámetro onBufferOverflow el valor de BufferOverflow.DROP_OLDEST. Dado que MutableSharedFlow no emite un valor inicial, es necesario emitir un valor inmediatamente después de su creación usando su función tryEmit. Otra diferencia que se puede observar es la omisión de elementos repetidos en un MutableStateFlow. Para conseguir el mismo comportamiento debemos aplicar el operador intermedio distinctUntilChanged al recibir los elementos de un SharedFlow. Además, nótese que en la versión del MutableStateFlow la emisión del elemento se da al realizar una asignación a su propiedad value, mientras que en un MutableSharedFlow la emisión se hace por medio de una llamada a su función emit.

El resultado obtenido al suscribirse a un MutableStateFlow, o en su defecto a un MutableSharedFlow con su propiedad replay en 1 y su parámetro onBufferOverflow igual a BufferOverflow.DROP_OLDEST, es el de recepción del elemento más reciente y todos los elementos subsecuentes. Claramente en el caso del MutableSharedFlow al no necesitar de un elemento inicial para su creación, si se dan suscripciones y no se ha emitido ni un solo elemento el suscriptor no recibirá nada al momento de su suscripción, pero se mantendrá observando y recibiendo todos los elementos que luego sean emitidos.

Si has trabajado con Subjects de RxJava, ya puedes reemplazar tus BehaviorSubject con alguna de estas dos implementaciones — preferiblemente con StateFlow ya que es una estructura especialmente diseñada para ello — , pero ¿será que los otros 3 Subjects que nos ofrece RxJava pueden ser reemplazados por estructuras SharedFlow? Antes de continuar y poner a prueba a la estructura SharedFlow es necesario que conozcas y comprendas los parámetros replay, extraBufferCapacity y onBufferOverflow.

Los parámetros ‘replay’, ‘extraBufferCapacity’ y ‘onBufferOverflow’

El comportamiento de un MutableSharedFlow depende enteramente del valor de sus parámetros durante su creación. Aunque todos son opcionales, cuando no establecemos sus valores éstos toman un valor por defecto. A continuación, veremos qué es cada uno y cómo afectan el comportamiento del flujo de datos.

  • replay

El parámetro replay indica el tamaño que tendrá la caché de elementos más recientes y por lo tanto indicará el número de elementos que serán retransmitidos al suscriptor en el momento mismo de la suscripción.

El valor por defecto de este parámetro es 0 provocando entonces que cada suscriptor reciba solamente los elementos subsecuentes a su suscripción.

  • extraBufferCapacity

Este parámetro establece un espacio extra en el que se almacenarán elementos en caso de haber algún suscriptor lento. Al momento de la suscripción, el suscriptor recibirá los elementos más recientes que se encuentren en la caché de retransmisión establecida por el parámetro replay, pero si el suscriptor recién iniciado consume los elementos más lento de lo que son producidos, éstos nuevos elementos se almacenarán en este buffer para dar tiempo a que el suscriptor los consuma.

El valor por defecto de este parámetro es 0, es decir, este espacio no será utilizado a menos que especifiquemos lo contrario estableciendo el tamaño explícitamente.

  • onBufferOverflow

Con este parámetro se define la estrategia que el flujo de datos utilizará para lidiar con suscriptores lentos, o en términos más técnicos, establece la estrategia de backpressure. A este parámetro se le puede asignar 3 diferentes valores definidos con las siguientes constantes:

  1. BufferOverflow.SUSPEND: El emisor se suspenderá al llamar a la función emit si ya no hay más espacio en el buffer que le permita depositar el nuevo elemento. El elemento será depositado en el buffer tan pronto se libere un espacio. En caso de no haber un buffer, es decir, en caso de que el parámetro extraBufferCapacity sea 0, el emisor se suspenderá al llamar a la función emit si la caché establecida por el parámetro replay está llena. Y si se diera el caso de que no existe una caché de retransmisión, es decir, en caso de que el parámetro replay también sea 0, el emisor se suspenderá al llamar a la función emit hasta que todos los suscriptores reciban el elemento emitido. En este último caso la dinámica sería entonces la que en el contexto de concurrencia se conoce como Rendevouz.
  2. BufferOverflow.DROP_OLDEST: El emisor nunca se supenderá al llamar a la función emit, sino que en caso de que el buffer se encuentre lleno se eliminará el elemento más antiguo del buffer para depositar el nuevo elemento que pasará entonces a ser el más reciente dentro del buffer. Es decir, estableciendo este valor en el parámetro onBufferOverflow se mantendrá una dinámica de descarte estilo FIFO en la cual el elemento candidato de descarte será aquel que haya ingresado primero al buffer de entre todos los elementos presentes en él.
  3. BufferOverflow.DROP_LATEST: El emisor nunca se suspenderá al llamar a la función emit, sino que en caso de que el buffer se encuentre lleno el nuevo elemento será inmediatamente descartado dejando el buffer intacto hasta que su elemento más antiguo sea consumido por todos los suscriptores para luego ser removido liberando así espacio para almacenar nuevos elementos. Cualquier nuevo elemento emitido será almacenado en el espacio libre que exista en el buffer hasta que éste se encuentre lleno nuevamente repitiendo entonces la dinámica de descarte. Con respecto a esta dinámica, a diferencia del caso anterior en el que la dinámica FIFO se le aplica al elemento por ser descartado, en este caso se le aplica la dinámica FIFO al elemento candidato por ser consumido y el descarte se le aplica a cada elemento nuevo que no encuentre espacio en el buffer.

El valor por defecto del parámetro onBufferOverflow es BufferOverflow.SUSPEND.

Para que tengas una idea más concreta y visual acerca de la estructura o interacción que se da entre la caché de retransmisión y el buffer quiero que observes la siguiente imagen:

La imagen anterior ilustra el buffer y la caché de retransmisión correspondiente a un MutableSharedFlow creado con los parámetros replay = 2y extraBufferCapacity = 3. Imagínate un arreglo o vector con un tamaño definido de n + m donde n es el tamaño del buffer y m es el tamaño de la caché de retransmisión.

Habiendo explicado los parámetros involucrados en la creación de un MutableSharedFlow ya podemos poner manos a la obra y experimentar con diferentes configuraciones para lograr un comportamiento que replique los otros 3 tipos de Subjects que nos ofrece RxJava.

📖 El BehaviorSubject ya fue explicado inicialmente al inicio de este artículo y desarrollado más a fondo en mi artículo anterior Flujos de datos con ‘StateFlow’.

SharedFlow como PublishSubject

Un PublishSubject emite solamente los elementos subsecuentes al momento de la suscripción, por lo tanto, ningún elemento que haya sido emitido antes de la suscripción volverá a ser emitido.

Para lograr un comportamiento análogo a un PublishSubject debemos eliminar la caché de retransmisión, de esta manera no habrá una retransmisión de elementos, sino que los suscriptores serán notificados solamente cuando se emitan nuevos elementos.

🔗 Encuentra el código anterior completo aquí.

Resultado:

SharedFlow como ReplaySubject

Un ReplaySubject retransmite absolutamente todos los elementos que hayan sido emitidos antes del momento de la suscripción, así como los elementos subsecuentes a ésta.

Dado que un ReplaySubject tiene la capacidad de incrementar su buffer cuando éste se encuentra lleno, para lograr un comportamiento análogo usando un MutableSharedFlow necesitaríamos establecer un tamaño de la caché de retransmisión lo suficientemente grande para que nunca sea sobrepasada por los elementos emitidos, es decir, necesariamente debemos conocer el número de elementos que serán emitidos y establecer una caché que permita almacenar cada uno de ellos. De esta manera con un MutableSharedFlow podemos emular con cierta limitación un ReplaySubject.

🔗 Encuentra el código anterior completo aquí.

Resultado:

SharedFlow como AsyncSubject

Un AsyncSubject emite solamente el último elemento y éste elemento es emitido solo si se completa el flujo de datos llamando a la función .onCompleted del Subject.

Como mencioné al inicio de este artículo, un SharedFlow, o en su defecto un MutableSharedFlow, nunca se completa de manera normal, por lo tanto no hay una manera de finalizar y cerrar el flujo de datos a diferencia de un Subject de RxJava que sí cuenta con esa posibilidad, sin embargo podemos crear un mecanismo que emita solamente el “último” elemento. Pongo “último” entre comillas porque sería realmente el penúltimo elemento y lo veremos a continuación.

Básicamente valiéndonos del polimorfismo y de las clases selladas podemos crear dos tipos de elementos que heredan de la misma clase sellada para que, a pesar de ser diferentes, puedan ser emitidos sin problemas:

Usaremos la clase MyToken como “envoltorio” de los elementos que queremos emitir. La emisión de un objeto de la clase MyFinalToken indicará que no quedan más elementos por emitir. Aunque el flujo de datos no se cerrará y de hecho permitirá más emisiones de elementos, la idea es emular el comportamiento de un AsyncSubject por lo que en teoría no deberíamos emitir más elementos después de emitir un elemento de tipo MyFinalToken y de ser posible deberíamos cancelar las coroutines suscritas al flujo de datos.

🔗 Encuentra el código anterior completo aquí.

Para conseguir el comportamiento deseado es necesario establecer la caché de retransmisión con un tamaño de 2 elementos para que se pueda recuperar el penúltimo elemento una vez que el último elemento es emitido.

Dentro de la lógica del flujo de datos del lado del suscriptor usamos el operador intermedio filter para omitir todos los elementos que no sean del tipo MyFinalToken. Una vez recibido el elemento de tipo MyFinalToken usamos el operador intermedio map para cambiarlo por el elemento que fue emitido anterior a éste y que se encuentra en el primer espacio de la caché de retransmisión. Para recuperarlo accedemos a la caché por medio de la propiedad replayCache del flujo de datos: myFlow.replayChache[0]. Para finalizar obteniendo solamente ese “último” elemento nos suscribimos con el operador terminal first el cual se desuscribirá tan pronto reciba 1 elemento.

De lado del emisor de elementos tenemos una lista de números que van del 1 al 10. Para poder emitir estos elementos es necesario que los “envolvamos” en objetos de tipo MyToken. Los emitimos de principio a fin y al finalizar emitimos un objeto de tipo MyFinalToken para indicar que ya no se emitirán más elementos.

Finalmente, al ejecutar el código anterior obtenemos la siguiente salida:

Como has podido observar un SharedFlow te permite suplantar en cierta medida los Subjects de RxJava, y aunque existen algunas limitaciones, principalmente con respecto al ReplaySubject, por lo general para los casos de uso más comunes un SharedFlow puede llevar a cabo la tarea correctamente con todos los beneficios que nos ofrecen las coroutines aunado a flujos de datos Flow.

Ahora que sabes cómo reemplazar un Subject de RxJava, es necesario que aprendas a implementar de una manera más formal y correcta el uso de un SharedFlow en tus proyectos.

Implementación correcta de un SharedFlow en un proyecto

A lo largo de todo este artículo hemos visto el uso del MutableSharedFlow y si no tienes mucha experiencia con la programación orientada a objetos (POO) debes estar preguntándote para qué necesitamos un SharedFlow si con un MutableSharedFlow lo podemos hacer todo.

Un objeto de tipo SharedFlow nos permitirá consumir los elementos emitidos, pero no nos permitirá emitir elementos a través de él. Esto es así para que ninguna clase o componente externo pueda cambiar su estado.

Vamos a suponer que estamos trabajando en una aplicación que sigue el patrón MVVM o MVP y aplicaremos el SharedFlow en la capa media, es decir en el ViewModel o el Presenter y llamaremos a esta clase MyComponent. A la vez a esta clase se le inyectará una dependencia correspondiente a un repositorio que simulará acceder a una API. A esta clase la llamaremos MyRepository.

MyRepository:

MyComponent:

En la línea 3 ocultamos el MutableSharedFlow de accesos externos, pero para poder consumir los elementos declaramos en la línea 4 un SharedFlow cuyo get retorna el MutableSharedFlow creado anteriormente.

Los parámetros del MutableSharedFlow tendrán los siguientes valores: replay = 2 , extraBufferCapacity = 3 y onBufferOverflow = BufferOverflow.DROP_OLDEST. Así cuando haya un nuevo suscriptor, el SharedFlow le retransmitirá los dos elementos más recientes. El buffer tendrá una capacidad de 3 elementos y en caso de que esté lleno y se intente emitir un nuevo elemento, el elemento más antiguo será desechado.

La función fetchAndEmitList accederá al repositorio y obtendrá la lista de números que luego emitirá a través del MutableSharedFlow uno por uno cada 250 milisegundos. La función release detendrá todas las coroutines creadas bajo el ámbito de la clase MyComponent.

Para poner en ejecución nuestra clase MyComponent lo haremos con el siguiente código:

🔗 Encuentra el código anterior completo aquí.

La idea es consumir los elementos accediendo al SharedFlow por medio del objeto myComponent con una espera de 1250 milisegundos antes de iniciar para ver el efecto de retransmisión de los 2 elementos más recientes. También ingresamos un delay de 600 milisegundos después de cada elemento recibido para provocar que el suscriptor sea más lento que la producción de elementos y así ver el efecto de la pérdida de los elementos más antiguos que se genera con esta configuración.

Iniciamos la emisión de datos llamando a la función fetchAndEmitList y esperamos a que finalice llamando a la función join del Job devuelto por la llamada anterior.

Finalmente ingresamos un delay de 2500 milisegundos para darle tiempo al suscriptor para que consuma hasta el último elemento emitido y seguidamente cancelamos las coroutines de myComponent (como buena práctica) llamando a la función release.

El resultado de la ejecución del código anterior es el siguiente:

El comportamiento de un SharedFlow es prácticamente el mismo que podemos obtener usando Channels por lo que surge la pregunta ¿es realmente necesario el SharedFlow?. Te lo respondo a continuación.

“Ya uso el ‘BroadcastChannel’, no necesito el ‘SharedFlow’” 🤨

Si has implementado el uso del BroadcastChannel debes tener en cuenta que el equipo de desarrollo de Kotlin tiene la intención de reemplazarlo completamente con SharedFlow, así que lo ideal es que migres hacia SharedFlow lo antes posible.

La documentación oficial establece las siguientes diferencias entre un BroadcastChannel y un SharedFlow:

  • SharedFlow no implementa la API de los canales, lo cual permite una implementación más simple y más rápida.
  • SharedFlow permite que la retransmisión de elementos recientes y la estrategia de backpressure sean configurables.
  • SharedFlow ofrece una clara separación entre la interfaz SharedFlow que es de solo lectura y el MutableSharedFlow.
  • SharedFlow no puede ser cerrado y tampoco puede lanzar un error. Todos los errores y señales de terminación deben ser representados y transmitidos como un tipo de elemento especial en caso de que lo consideres necesario.

Solo me queda advertirte que no se recomienda realizar implementaciones que hereden de la interfaz de SharedFlow ya que está sujeta a cambios y es muy probable que más métodos sean añadidos. En su lugar se recomienda usar el constructor MutableSharedFlow(replay, ...) para ello.

--

--

Glenn Sandoval
Kotlin en Android

I’m a software developer who loves learning and making new things all the time. I especially like mobile technology.