A fondo con Flows & Channels — Parte 3: Channels

Julián Ezequiel Picó
Droid LATAM
Published in
11 min readJun 29, 2020

English version

El texto está basado en la versión 1.3.7 de la biblioteca kotlinx.coroutines

Ahora ya sabemos qué es un Stream (y sus tipos) y cómo funcionan los Flows (y, nuevamente, si no es el caso, mirá los artículos anteriores, los links están acá abajo). Ahora, vamos a descubrir a segunda implementación de Streams que ofrece Kotlin: Channels.

De nuevo, podés ver algunos ejemplos yendo al GitHub Repository. Hay casos simples y pequeños que van a ayudarte a entender como usar Channels y algunas otras cosas interesantes.

Recuerden que este artículo pertenece a una serie de los mismos:

Antes de empezar, déjenme advertirles: este artículo va a ser un poco extenso. Si sentís que no estás entendiendo algo, por favor tomate un descanso y retomá la lectura luego. En el texto vas a encontrar todo lo que necesitas para entender como funcionan los Channels, date tiempo para procesar!

Empecemos con la parte 3: Channels!

Channels

Qué es un Channel?

Un Channel se utiliza para establecer una conexión entre un emisor y un receptor. Esta conexión puede ser cerrada o cancelada, y todas las operaciones relacionadas son suspendable.

La primer gran diferencia con Flows es que pueden comenzar la emisión de valores sin importar si alguien los está escuchando o no.

También, como podemos acceder a un Channel a través de su referencia (es decir, un objeto), se pueden emitir y recibir valores donde uno quiera (o donde uno tenga la referencia). Si comparamos esto con los Flows, donde solo podíamos emitir valores dentro del block, acá encontramos una gran diferencia también.

Entonces, con todas estas características, podemos asumir que los Channels son Hot Streams.

Detrás de escena, tenemos una interfaz que implementa otras dos interfaces: SendChannel y ReceiveChannel. En esa implementación es donde vamos a encontrar las claves sobre cómo funcionan los Channels.

Un diagrama que puede explicar a alto nivel qué es un Channel es el siguiente:

Podemos ver que:

  • Existe un Sender, que pone información dentro de un Buffer. Esto se realiza a través de operaciones con el SendChannel.
  • Existe un Receiver, que obtiene información del Buffer. Esto se realiza a través de operaciones con el ReceiveChannel.
  • Existe un Buffer, que nos ayuda a sincronizar al Sender y al Receiver. Recuerdan el artículo de Roman donde mencionaba la sincronización en los Hot Streams? Bueno, esto es a lo que el se refería.

Podemos decir que el diagrama que está arriba representa un Channel y su comportamiento.

Emisor del Channel

La interfaz SendChannel representa al emisor del Channel, como dijimos antes. La misma define el comportamiento del emisor, o en otras palabras, como el Channel emite información.

Revisemos qué tenemos acá:

  • El atributo isClosedForSend, que retorna si el Channel está cerrado y no puede enviar más información. Si intentás enviar algo, vas a recibir una excepción ClosedSendChannelException.
  • Los métodos send() y offer() son utilizados para emitir valores. Y, si recordamos bien, son los mismos que teníamos que usar con los channelFlows, no?
  • El método close() se utiliza para cerrar el emisor del Channel. Y presten especial atención acá: solo al emisor del Channel. Más adelante vamos a ver por qué el énfasis.
  • El método invokeOnClose() es llamado automáticamente cuando el emisor del Channel se cierra o el receptor se cancela.

Y eso es todo lo que necesitamos saber por ahora. Pasemos a hablar un poco más de cómo emitir información.

Emitiendo información — To offer() or to send() ?

Mencionamos que estos métodos son utilizados para la emisión de información. Pero, esperen, por qué tenemos dos métodos para poner info en el buffer? Clarifiquemos:

El método offer() agrega un elemento al buffer inmediatamente si es posible. Al decir esto último, me refiero a “si no viola la capacidad del Channel” (o, en otras palabras, si el buffer existe y no está lleno).

También, si revisamos la definición de SenderChannel, podemos ver que offer() retorna un Boolean: true si el elemento puede ser añadido, false si no puede.

Por el otro lado, tenemos el método send(). Acá, si el buffer no está lleno, el elemento se agrega inmediatamente, tal como hace offer(). Pero, que pasa si el buffer está lleno, o no existe? Bueno, quien llama a send() se suspende.

Entonces, la diferencia principal acá es que send() es una operación suspendable, y ayuda a sincronizar el emisor y el receptor. No tenemos un retorno con este método, sino que nos suspendemos.

En resumen:

  • Si querés enviar un valor (o intentar enviarlo) sin esperar, entonces usa offer(). Tené en cuenta que, tal vez, ese valor nunca llegue al buffer.
  • Si querés que el valor sea recibido por alguien, y podes esperar hasta que ese alguien aparezca, utiliza send().

Receptor del Channel

La interfaz ReceiveChannel representa al receptor del Channel, como mencionamos anteriormente. La misma define el comportamiento de los receptores, o en otras palabras, cómo podemos escuchar las emisiones del Channel.

Revisemos que tenemos acá:

  • El atributo isClosedForReceive retorna si el receptor no puede recibir ningún valor. Cuándo un channel está closed for receive? Lo vamos a discutir más adelante porque no es tan simple. Esperá un poco más!
  • Los métodos receive() y poll() son utilizados para obtener los valores del buffer.
  • El método cancel() se utiliza para detener la recepción de todos los elementos del buffer, eliminando los que no fueron enviados (limpia el buffer).

Hay algunas cosas más en la interfaz, pero no son muy importantes. Si querés investigar un poco más, podés ir a la documentación.

Recibiendo información — To poll() or to receive() ?

Acá, parecido a lo que teníamos para enviar, tenemos dos métodos. Por qué? Tal vez estés pensando en la diferencia entre offer() y send(), y estás en lo correcto porque tenemos algo similar.

El método poll() obtiene un elemento de forma sincrónica. Si el buffer tiene algo dentro, ese algo es retornado. Qué pasa si el buffer está vacío? Simple: retorna null.

Por el otro lado, tenemos el método receive(), y este obtiene un elemento de forma asincrónica. Si el buffer tiene algo dentro, ese algo es retornado. Pero, si el buffer está vacío o no existe, quien llama a receive() se suspende hasta que un nuevo valor sea emitido al buffer.

Entonces, la principal diferencia que tenemos acá es que receive() es una operación suspendable, y este es otro de los mecanismos con los cuales podemos sincronizar al emisor y al receptor.

En resumen:

  • Si querés recibir un valor (o intentar recibir) sin esperar, usa poll(). Tené en cuenta que, posiblemente, no recibas ningún valor.
  • Si querés recibir un valor real, y podés esperar hasta que ese valor aparezca, usa receive().

Cuándo el Channel está closed for receive?

Para poder garantizar que un Channel está closed for receive, existen dos condiciones que tienen que cumplirse simultáneamente:

  • Alguien llamó a SendChannel.close(), por lo que el Channel está closed for send.
  • Todos los elementos enviados al buffer fueron recibidos por un receptor, por lo que el buffer está vacío. Si el buffer no está vacío, el Channel todavía no está closed for receive.

Entonces, si cerramos al emisor (SendChannel.close()) pero el buffer tiene items dentro, el Channel no va a estar closed for receive hasta que esos items hayan sido enviados.

Cerrando o Cancelando?

Si prestamos atención, pudimos notar que tanto el emisor como el receptor tienen métodos para “cerrarse”: SendChannel.close() y ReceiveChannel.cancel(). Pero, por qué? Cuál es la diferencia?

Para mostrar esto, creé una tabla que compara ambos métodos y sus side effects (o efectos de lado). Revisala!

Como podemos ver, hay varias diferencias. Tenemos que tener cuidado al seleccionar que método vamos a utilizar.

En la próxima sección, vamos a ver que puede pasar cuando cerramos un Channel. Porque, por supuesto, esto es tan simple que tiene que haber algo más complejo, no? 🙃

Closed Channels y Failed Channels

Hay algo más cuando llamamos al método SendChannel.close().

Dependiendo de cómo lo llamamos (con una causa o no) podemos tener dos estados diferentes: closed o failed. Para clarificar, cuando digo causa me refiero a una excepción (si miramos la interfaz de SendChannel, podemos ver que el método close() puede recibir una excepción).

De nuevo, preparé una tabla donde se puede ver la diferencia entre los dos estados mencionados:

Creo que la diferencia más importante es lo que sucede cuando llamamos a poll().

Hay una última cosa más que tenemos que saber: si llamamos a ReceiveChannel.cancel() con una causa, también vamos a obtener un Failed Channel, porque esa causa es enviada a SendChannel.close().

Basta de teoría (o no?), vamos a crear algunos Channels!

Creando Channels

Todos los Channels son creados con una factory function

De acuerdo al valor que enviamos en el parámetro capacity, podemos crear distintos tipos de channels. Podemos observarlos en la cláusula when.

Los Channels tienen diferente comportamiento de acuerdo a su tipo. hay dos diferencias principales entre ellos:

  • La capacidad del buffer.
  • Cómo gestionan los valores del buffer.

Para explicar cómo funcionan, voy a utilizar los métodos send() y receive(), ya que suspenden a quienes los utilizan, y de esa forma podemos ver cómo funciona la sincronización en cada Channel.

Rendezvous Channel

Este es el Channel que obtenemos si no especificamos el parámetro capacity, y lo creamos con las siguientes líneas:

La instancia que obtenemos es un RendezvousChannel.

La capacidad de este buffer es de 0. Qué significa eso? Bueno, podemos decir que el Channel no tiene capacidad, o en otras palabras, el buffer no existe.

Como regla, decimos que los elementos son enviados únicamente cuando un emisor y un receptor se encuentran.

Técnicamente:

  • Todos los que llaman a send() se suspenden hasta que aparezca alguien que llame a receive().
  • Todos los que llaman a receive() se suspenden hasta que aparezca alguien que llame a send().

Tengo tarea para ustedes! Qué pasa si usamos únicamente offer() con este Channel? Vayan a probar!

Buffered Channel

Los creamos con la siguiente línea:

La instancia que obtenemos es un ArrayChannel.

La capacidad del buffer de este Channel es de 64 elementos, y esa es la capacidad default configurada en la JVM (podemos cambiarla, pero no lo necesitamos).

Técnicamente:

  • Todos los que llaman a send() se suspenden si el buffer está lleno, y eso significa que hay 64 elementos en él.
  • Todos los que llaman a receive() se suspenden si el buffer está vacío.

Con la ayuda de este Channel, introducimos una regla general:

En todos los Channels, sin importar su tipo, todos los que llaman a receive() se suspenden siempre si el buffer está vacío.

Tal vez es un poco difícil de ver con los Rendezvous Channels, pero con el Buffered lo podemos observar mas explícitamente.

Unlimited Channel

Los creamos con la siguiente línea:

La instancia que obtenemos es un LinkedListChannel.

Con este Channel, tenemos un buffer con capacidad ilimitada. Pero, como sabemos, no existe tal cosa como ilimitado en el mundo real, por lo que realmente nuestro límite va a ser la memoria disponible que tengamos.

Técnicamente:

  • Nadie que llame a send() se suspende ya que, en teoría, el buffer nunca está lleno.
  • Todos los que llaman a receive() se suspenden si el buffer está vacío.

Conflated Channel

Los creamos con la siguiente línea:

La instancia que obtenemos es un ConflatedChannel.

Con este Channel, también tenemos un buffer con capacidad ilimitada, como en los Unlimited.

Entonces, cuál es la diferencia si el buffer es igual al Unlimited? Bueno, este channel tiene una particularidad: siempre ofrece el último valor enviado al buffer, descartando los que no fueron recibidos por nadie. El receptor siempre obtiene el elemento que se envió más recientemente. Podemos decir que, en realidad, el buffer no es realmente ilimitado, ya que solo necesita capacidad para un único elemento.

Pero, tranquilos, esto no significa que el Channel ofrezca todo el tiempo el último valor. Si el mismo fue consumido por alguien, va a ser eliminado del buffer y éste va a quedar vacío.

Técnicamente:

  • Nadie que llame a send() se suspende.
  • Todos los que llaman a receive() se suspenden si el buffer está vacío.

Customized Channel

Los creamos con cualquiera de las siguientes líneas:

La instancia que obtenemos es un ArrayChannel.

La capacidad del buffer de este Channel es la que enviemos a la factory function. En este caso, tenemos buffers con capacidad para 10, 15 y 300 elementos.

Funciona como un Buffered Channel, por lo que no voy a explicar los tecnicismos del mismo. Es solo un Buffered Channel con la capacidad que nosotros queramos darle.

Un excelente artículo donde se puede visualizar (de verdad, visualmente) como funcionan los diferentes Channels es este.

Otra recomendación es que chequeen los ejemplos que están en GitHub Repository, para que puedan ver como los Channels emiten y reciben valores.

Consumiendo valores

Hablemos un poco más sobre el consumo de los elementos del buffer. No voy a explicar cómo poner información en el mismo porque lo podemos hacer utilizando los métodos send() o offer() y ya, pero, para consumir información, hay algunas cosas que necesitan ser aclaradas.

Como vimos, podemos consumir valores del Channel utilizando poll() y receive(). Pero, lo importante acá es que esos métodos retornan un único valor. Entonces, si queremos consumir más de uno, tenemos que wrappear la llamada en un, por ejemplo, while(true) (o similar).

Acá podemos ver una gran diferencia con los Flows, donde podemos recolectar todas las emisiones realizadas por ellos aplicando solo un operador terminal.

Afortunadamente, tenemos una solución para esto: podemos utilizar el poder de los Flows para consumir los valores de un Channel. Existe una extension function, consumeAsFlow(), que, como su nombre sugiere, nos permite consumir las emisiones del Channel como si fuese un Flow.

Qué está pasando detrás de escena acá? Cómo un Channel puede ser un Flow? Déjenme mostrarles al bien conocido… ChannelFlow!

Un poco más sobre consumo de información

Existen otros métodos que podemos utilizar. No los mencioné porque creo que con poll(), receive() y consumeAsFlow() tenemos todo lo que necesitamos.

Esos métodos son:

  • consume(), pero no da un elemento del buffer directamente.
  • consumeEach(), similar a consumeAsFlow(). En teoría, luego de consumir todos los valores cancela al ReceiveChannel, pero intenté reproducir este comportamiento y no tuve éxito.
  • consumes(), consumesAll(), y otros: están deprecados o van a estarlo en algún futuro release.

Implementaciones de Channel

En general, cualquier caso que pueda se representado como un escenario de publisher-subscriber puede satisfacerse con la utilización de Channels.

Luego, podemos notar una de las más grandes diferencias con Flows: los Channels no están vinculados a un ciclo de vida específico. Si queremos cerrar/cancelar un Channel, necesitamos hacerlo explícitamente con los métodos que exponen receptor y emisor. Entonces, si queremos tener un stream que sobreviva a través de pantallas y ciclos de vida, vamos a utilizar Channels.

Entonces, todo ok hasta acá? Podemos terminar el artículo ahora? Bueno, no. Hay una última cosa que todavía no probamos: tener múltiples receptores.

Intentando agregar múltiples receptores

Bueno, si tenemos un Hot Stream, tenemos una transmisión multicast, no? Entonces, vamos a aprovechar las ventajas de esta propiedad:

  1. Creamos un Channel y empezamos a enviar valores con él.
  2. Empezamos a escuchar valores con dos receptores.
  3. Los dos receptores comienzan a recibir los valores del buffer.

Todo ok, no? Bueno, no.

Los Channels tienen un mecanismo de transmisión unicast, por lo que los receptores no van a obtener todos los valores del buffer. En su lugar, ellos se van a turnar y van a recibir información en el orden en que se van suscribiendo al buffer. No tan hot…

Cómo podemos resolver esto? Van a tener que esperar al próximo artículo 😄

Hasta acá llegamos!

Espero que ahora se entienda qué es un Channel y como funciona. Hay muchísimos artículos por ahí, así que pueden seguir investigando para obtener más conocimiento.

En el próximo artículo, vamos a hablar de Broadcast Channels.

Nos leemos! Y, si te gustó, podes compartirlo!

Agradecimientos al equipo Android de MediaMonks por el feedback.

--

--