A fondo con Flows & Channels — Parte 2: Flows

Julián Ezequiel Picó
Droid LATAM
Published in
9 min readJun 22, 2020

English version.

El texto está basado en la versión 1.3.7 de la biblioteca kotlinx.coroutines

Ya sabemos que son los cold y hot streams (y si todavía no lo sabes, mirá el artículo anterior, el link está más abajo). Ahora, vamos a hablar sobre nuestra primer implementación de un stream en Kotlin: los Flows.

De nuevo, podés ver algunos ejemplos revisando el repositorio de GitHub. Vas a encontrar unos casos muy simples y chiquitos que te van a ayudar a entender como se usan los Flows y algunas otras cosas más.

Recuerden que este artículo pertenece a una serie de los mismos:

Arranquemos con la parte dos: Flows!

Flows

Qué es un Flow?

Un Flow se utiliza para representar una emisión de valores en forma secuencial que, en algún punto, termina (naturalmente o porque pasó algo malo).

Todas las operaciones en un Flow son ejecutadas de forma secuencial dentro del mismo bloque de código (y, por esto, dentro del mismo Coroutine Scope).

Una característica importante de Flow es que sólo comienza a emitir valores cuando alguien los pide. Antes de que alguien se suscriba (o realice una recolección), el Flow no ejecuta su código.

Entonces, tenemos un cold stream, no?

Bueno, si miramos a la documentacion:

The Flow interface does not carry information whether a flow truly is a cold stream that can be collected repeatedly and triggers execution of the same code every time it is collected, or if it is a hot stream that emits different values from the same running source on each collection. However, conventionally flows represent cold streams.

No son tan definitivos como nos gustaría. Podemos asumir que si, que son cold streams, pero vamos a revisar esto más adelante.

Detrás de escena, un Flow es una interfaz que expone un método para recolectar los valores emitidos:

Upstream y Downstream Flows

Los conceptos de upstream y downstream nos van a ayudar a definir “sobre qué Flow voy a operar” y “qué Flow voy a producir”.

Todo depende de que operador consideremos. Los dos conceptos son muy, podríamos decir, poderosos, ya que todos los operadores y las propiedades de un Flow están fuertemente relacionadas con los mismos.

Flow Operators

Hay dos tipos de operadores (operators oficialmente): intermediate y terminal.

Intermediate operators

Son funciones que se aplican al upstream flow y retornan un downstream flow, donde podemos seguir aplicando otros operadores.

Estos operadores no ejecutan código contra el Flow directamente, en su lugar, utilizan el flow que reciben para aplicar operaciones y retornar otra información (en otras palabras, retornan su versión del flow recibido).

Algunos operadores no retornan información, sino que solo le agregan una característica al flow. No vamos a hablar de estos, pero es bueno que sepan que existen. Un ejemplo, por ejemplo, podría ser el operador cancellable.

Los intermediate operators no son funciones suspendable, pero pueden trabajar con funciones suspendable dentro. Podemos decir que son utilizados para crear una cadena de operaciones.

Algunos de ellos pueden ser: map, filter, take, takeWhile, catch, y muchísimos más. No voy a explicar operadores en este artículo porque no es el objetivo del mismo, pero se puede encontrar mucha documentación por ahí.

Terminal operators

Estos operadores son funciones suspendable y su propósito, en general, es el de recolectar los valores recibidos del upstream flow.

A su vez, funcionan como un trigger de una ejecución de un flow: si no se aplica un terminal operator al mismo, el flow no empieza la emisión de valores.

Como su nombre menciona, son terminal porque no se puede aplicar otro operador al flow (de ningún tipo) si alguno de ellos es aplicado.

Algunos de ellos pueden ser: collect, single, toList, y más. De nuevo, no voy a explicar operadores, pero vayan a la documentación que hay muchísima información!

Propiedades de un Flow

Todas las implementaciones de Flow tienen que adherir a dos propiedades, que voy a describir debajo.

1. Context preservation

Un flow encapsula su contexto de ejecución y nunca lo propaga a sus recolectores (los que están utilizando los valores que el emite).

En resumen:

  • El contexto donde un flow está emitiendo valores nunca se filtra (se lekea, en spanglish) al receptor del downstream flow.
  • Los valores son producidos en un único Coroutine Scope.

Algunos flows permiten tener más de una coroutine (por ende, posiblemente más de un Coroutine Scope). Sin embargo, si usamos los builders que provee Kotlin para crear Flows, ellos garantizan que esta propiedad se cumple.

Hay una excepción, una forma de cambiar el contexto, por ejemplo utilizando el operador flowOn, pero no voy a hablar de esto porque podemos estar 3 o 4 horas más.

Si se intenta lanzar otra coroutine en el mismo flow builder, algo así:

Vamos a obtener un hermoso error:

Podemos ver, en la última línea, que existe un approach para usar más de una coroutine, y lo vamos a ver más adelante.

2. Exception transparency

Las implementaciones de flow nunca deben catchear o manejar excepciones que ocurran en el downstream flow. Para manejo de excepciones, tenemos que usar el operador catch, que está diseñado para catchear las excepciones que vienen del upstream flow.

Una cosa importante que podemos ver acá es que nunca debemos hacer un try-catch de las llamadas a emit() o emitAll().

Hay otros operadores que sirven para manejar los casos de error, como retry o retryWhen, y les sugiero que les peguen una mirada.

Qué pasa si, por ejemplo, intentamos hacer algo así?

Bueno, vas a obtener otro hermoso error:

Creación de un Flow

Kotlin nos da varios builders para crear un Flow. Me voy a enfocar en los que yo considero más importantes: flow y channelFlow. Hay más, como flowOf() o asFlow(), pueden ir a ver la documentación que está muy completa.

flow { }

Este builder crea un flow para emitir valores de forma secuencial.

Todas las llamadas a emit() o emitAll() tienen que estar dentro de las llaves (en otras palabras, dentro del block FlowCollector).

Por supuesto, por cada operador terminal aplicado al flow, una nueva ejecución de block va a ejecutarse.

callbackFlow { } y channelFlow { }

Estos builders crean un flow que puede emitir valores de forma concurrente (también permite secuencial, pero no es su principal característica).

La diferencia entre ellos es conceptual:

  • Tenemos que usar callbackFlows cuando necesitamos wrapear un Callback y exponerlo como un Flow.
  • Tenemos que usar channelFlows cuando necesitamos emitir de forma concurrente dentro del Flow.

Existe otra diferencia entre ellos, que voy a mencionar más adelante. Hasta la versión 1.3.4 de la biblioteca coroutines-core, eran lo mismo.

Con estos flows, no podemos emitir usando los métodos emit() o emitAll(), en su lugar tenemos que usan send() o offer(). Cuál es la diferencia entre ellos? Bueno, vamos a tener que esperar a ver Channels 😀.

De nuevo, por cada operador terminal aplicado a los mismos, se va a disparar una nueva ejecución de block.

Ambos flows tienen una peculiaridad: terminan cuando la última línea de código dentro de block es ejecutada, y eso significa que debemos asegurar que ninguna coroutine esté ejecutándose cuando llegamos a esa línea. Pero, hay casos en los que no queremos que el Flow termine ahí, por ejemplo cuando hacemos una llamada a una API y queremos esperar hasta obtener una respuesta. Qué hacemos en estos casos?

Evitando la finalización de channelFlow y callbackFlow

Podemos solucionar esto utilizando la función awaitClose() dentro del block. Esta llamada debe ser la última línea de código del block, obligatoriamente. Haciendo esta modificación, lo que estamos diciendo es “espera a que yo te diga explícitamente que tenés que finalizar para dejar de emitir”.

Podemos cancelar estos flows en cualquier lugar donde queramos usando el método close() (dentro de block, claro).

Recuerdan que mencioné que existía otra diferencia entre channelFlow y callbackFlow? Bueno, acá la tenemos: si no se agrega la llamada al método awaitClose() al final de un callbackFlow, va a fallar.

Pero, esperen un segundo. Qué esta pasando detrás de escena? Cómo un Flow puede, potencialmente, tener más de un contexto de ejecución? Por qué la llamada a los emit() no está permitida? Lo veremos más adelante.

Consumiendo valores

Podemos consumir las emisiones de un Flow, principalmente, de dos maneras.

Usando terminal operators

Hay varios operadores terminal que pueden ser aplicados al Flow. Los más comunes son collect(), single() y toList(). Cada uno de ellos recolecta los valores de forma diferente pero, como ya dije antes, no voy a hablar de operadores acá, así que vayan a la documentación!

Usando asLiveData()

Podemos consumir el Flow como si fuese un LiveData.

El LiveData que obtenemos tiene el mismo tipo que el Flow. Entonces, si tenemos un Flow de, por ejemplo, valores Int, vamos a obtener un LiveData de valores Int.

Esto es muy útil si no queremos enviar un Flow directo a la View, pero queremos mantenerla actualizada utilizando LiveData.

Una aclaración acá: para usar esta extension function tenemos que incluir la biblioteca androidx lifecycle ktx.

Cancelando un Flow

Funcionan como una coroutine en este aspecto: si el Job padre se cancela, el Flow se cancela también. Pero hay algo más porque, como probablemente sabemos, las coroutines adhieren a la cancelación cooperativa, por lo que los Flows también lo hacen. Si quieren tener un poco más de información sobre la cancelación cooperativa, les recomiendo que lean esta serie de artículos.

Como también probablemente sabemos, la cancelación cooperativa implica muchas cosas, y una de ellas es que debemos chequear si el Job padre está activo o no, y cancelar las emisiones del Flow de acuerdo a esto. Podemos decir que no es muy cómodo hacer esto luego de cada emisión de un valor, y afortunadamente tenemos una solución.

Podemos usar el operador intermedio cancellable. Este operador va a chequear si el Job está activo después de cada emisión. Por default, todos los flows creados con el builder flow {} implementan este operador.

Qué pasa si queremos cancelar el Flow de forma explícita? Bueno, por ahora no hay ningún método para cancelar un flow {}, pero en caso de que estemos usando channelFlow { } (o callbackFlow { }), podemos usar close().

Implementaciones de Flow

Voy a enumerar algunos casos de uso prácticos, pero hay un montón más que pueden encontrar por ahí:

  • Retornar información utilizando múltiple data sources, accediendo a ellas de forma secuencial: por ejemplo: primero me fijo si hay algo en la caché y lo retorno, luego hago una llamada a una API para obtener nuevos resultados, luego los almaceno en una base de datos, y luego retorno la información utilizando esta última.
  • ‘Wrapear’ API Callbacks y exponerlas como streams: utilizando callbackFlow podemos ‘wrapear’ (envolver, transformar) cualquier callback y enviar la información que recibimos del mismo como un Stream.
  • Necesitamos ‘scopear’ las emisiones a un ciclo de vida: estos streams son útiles cuando queremos scopear (limitar) las emisiones al ciclo de vida de un determinado componente. Si, por ejemplo, estamos utilizando MVVM y viewModelScope, todos los Flows que se inicien con ese scope van a ser cancelados cuando el ViewModel se limpie (se llame a onCleared()).
  • Más más más.

Que hay detrás de channelFlow?

Un ChannelFlow puede tener múltiples coroutines ejecutándose dentro de él y puede emitir información que viene desde fuera del stream. Entonces, que hay detrás?

Bueno, si miramos la implementación interna de ChannelFlow, podemos ver que dentro utiliza algo llamado ReceiveChannel.

Como se podrán imaginar, tenemos una implementación similar a un Channel detrás de escena. Entonces, estamos usando un Channel (hot stream?) dentro de un Flow (cold stream?) para darle al Flow (cold stream?) features de un Channel (features de un hot stream?).

Hasta acá llegamos!

Espero que ahora puedan tener un poco más de entendimiento sobre que es un Flow y como funciona. Hay muchos, muchos artículos dando vueltas por ahí, así que pueden seguir explorándolos para obtener más conocimiento.

En el próximo artículo, vamos a empezar a hablar sobre Channels.

Nos leemos! Y, si te gustó, podes compartirlo!

Agradecimientos al equipo Android de MediaMonks por el feedback.

--

--