Implementando Reactive Commons con Elixir

Conoce nuestro nuevo proyecto Open Source.

Juan Carlos Galvis
Bancolombia Tech
6 min readAug 5, 2021

--

La inclusión de nuevas tecnologías dentro del stack tecnológico de Bancolombia, conlleva a replicar aquellas iniciativas que facilitan y aceleran el desarrollo de software en los equipos; por ejemplo, con la adopción de Elixir como parte del stack para sistemas reactivos, surge la necesidad de habilitar Reactive Commons para este lenguaje de programación.

Reactive Commons proporciona un conjunto de abstracciones e implementaciones de diversos patrones y prácticas que apoyen la base para una arquitectura de microservicios reactivos. Esta definición busca establecer directrices enfocadas al dominio aplicando los patrones más comunes de la comunicación de sistemas a través de eventos.

En el caso de Reactive Commons, se usa RabbitMQ por defecto y nos permite aislar detalles del broker que utilicemos. Esta librería nos evita conocer el detalle que se muestra en la figura 1, enfocándonos en realizar una definición de alto nivel a través de Eventos, Comandos y Consultas.

figura 1: Representación de algunos recursos creados y administrados por Reactive Commons.

Para conocer más sobre qué son Eventos, Comandos y Consultas, Daniel Bustamante Ospina nos cuenta más sobre arquitectura de microservicios reactivos en:

En GitHub, ya tenemos disponible nuestro proyecto para que nuevas ideas y capacidades puedan ser desarrolladas no solo al interior de la organización, sino también con el apoyo de la comunidad open source.

Esta implementación busca extender la interoperabilidad provista por un broker como RabbitMQ en conjunto con la abstracción de Reactive Commons, llevando a que los sistemas reactivos desarrollados en Elixir, puedan concentrarse en la implementación de la lógica del dominio, sin necesidad de llegar a un nivel detallado sobre la comunicación con el broker.

Configuración

Esta librería se encuentra disponible en el gestor de paquetes de Erlang Solutions:

Se debe añadir la dependencia al proyecto:

def deps do
[
{:reactive_commons, "~> 0.1.0"}
]
end

Agregar el siguiente módulo a los childrens de la aplicación:

async_config = AsyncConfig.new("my-app-name")
children = [
{MessageRuntime, async_config},
]

Ingrese a hexdocs ver mas detalles sobre AsyncConfig

Abstracción

Partiendo de la definición de ReactiveCommons se tienen disponibles los siguientes módulos de Elixir:

DomainEvent: Esta estructura permite representar un evento en el sistema. Acepta un nombre, cualquier dato que será la información a transportar para ese evento (debe ser serializable a JSON) y un id de mensaje (opcional).

event = DomainEvent.new("usuario-bloqueado", %{usuario: "usuario1"})

Command: Una estructura básica que permite representar un comando en el sistema. Acepta un nombre, cualquier dato o mapa que será la información a transportar para ese comando (debe ser serializable a JSON) y un id de comando (opcional).

command = Command.new("bloquear-usuario", %{usuario: "usuario1"})

AsyncQuery: Es la última estructura básica que sirve para representar una consulta asíncrona en el sistema. Acepta la información para la consulta, la cual debe ser serializable a JSON y un nombre de recurso asociado a la consulta.

query = AsyncQuery.new("estado-usuario", %{usuario: "usuario1"})

Interacción entre sistemas a través de mensajes

Estos tres módulos permiten la abstracción inicial a nivel de dominio y de datos, sin embargo se requiere de otros módulos que van a permitir la interacción entre diferentes sistemas a través del envío y escucha de mensajes.

Como tratamos hasta este momento, se definen tres estructuras para las interacciones, hay dos interacciones directas y una interacción indirecta. Las dos primeras son el envío de comandos y las consultas asíncronas hacia un destino específico; la última interacción es la emisión de eventos sin un destino específico, sobre los cuales, diversos interesados pueden reaccionar.

Técnicamente se hacen disponibles los siguientes módulos de Elixir que permitirán lograr las interacciones.

Eventos

figura 2: pub sub

La emisión y escucha de eventos también conocida como el patrón pub sub, se hace disponible a través del módulo DomainEventBus cuya función es la siguiente:

DomainEventBus.emit(event) # puede retornar :ok o un error

De otro lado estarán los sistemas interesados en escuchar determinados eventos, para lo cual está disponible el módulo HandlerRegistry que habilita la siguiente función para configurarlo:

HandlerRegistry.listen_event("usuario-bloqueado", &usuario_bloqueado/1)

El primer parámetro de la función es el nombre del evento de dominio que se desea escuchar, el segundo parámetro es la referencia a la función que procesará el evento, la cual recibe un parámetro que es el dato del evento. Más adelante se profundizará sobre el módulo HandlerRegistry y su uso.

Comandos

figura 3: fire and forget

Lanzar un comando y escuchar un comando es otro de los patrones disponibles también conocido como fire and forget, para el cual se hace disponible el módulo DirectAsyncGateway específicamente con la siguiente función:

DirectAsyncGateway.send_command(command, "app-name")
# puede retornar :ok o un error

Aparte de recibir el comando, se espera un segundo parámetro que es el nombre del destino (aplicación de destino, quien ejecutará el comando).

En la aplicación de destino a través del mismo módulo de escucha de eventos, se realiza la configuración de la escucha del comando así:

HandlerRegistry.handle_command("bloquear-usuario", &bloquear_usuario/1)

Así mismo, el primer parámetro es el nombre del comando a escuchar y el segundo es la referencia a la función de un solo argumento que se encargará de procesar el respectivo comando.

Consultas Asíncronas

figura 4: request / reply

El último patrón de consultas asíncronas conocido como request/reply, se logra haciendo uso del módulo DirectAsyncGateway usando la siguiente función:

DirectAsyncGateway.request_reply_wait(query, "app-name")
# puede retornar {:ok, response} o un error

El primer parámetro es la información requerida para ejecutar la consulta, y el segundo es el nombre de la aplicación de destino encargada de resolver la consulta.

Del lado de la aplicación que resuelve la consulta asíncrona, se hace uso del mismo módulo HandlerRegistry usado para la subscripción a eventos y a comandos, a través de la función que se indica a continuación:

HandlerRegistry.serve_query("estado-usuario", &estado_usuario/1)

El primer parámetro es el nombre de la consulta, el segundo es la referencia a la función encargada de resolver la consulta. A diferencia de las funciones que procesan eventos y comandos donde no hay un retorno específico, el valor de retorno de la función de consultas será la información que se entregará a la aplicación que emitió la consulta. Por ejemplo podría ser algo como:

def estado_usuario(%{usuario: usuario_id} = request) do
UsuarioUseCase.get_estado_usuario(usuario_id)
# este valor de retorno se enviará como respuesta
end

HandlerRegistry

Como se puede apreciar, el modulo HandlerRegistry permite la configuración de la escucha de las diferentes representaciones semánticas (Comandos, Eventos, Consultas). Este módulo debe ser inicializado en el arranque de la aplicación preferiblemente en un proceso (ver ejemplo). Este módulo se puede entrelazar a través del operador de composición |> para hacer más fácil su configuración. por ejemplo:

HandlerRegistry.serve_query("estado-usuario", &estado_usuario/1)
|> HandlerRegistry.handle_command("bloquear-usuario", &bloquear_usuario/1)
|> HandlerRegistry.listen_event("usuario-bloqueado", &usuario_bloqueado/1)
|> HandlerRegistry.commit_config()

Al invocar commit_config se persiste la configuración y se inician procesos asociados a la escucha de estos diferentes mensajes.

--

--