Project Reactor: Um pouco sobre Mono e Flux

Felipe Formentin
Devspoint
Published in
8 min readSep 28, 2021

Entrando na parte técnica e conhecendo um pouco mais sobre os métodos mais comuns utilizados nas classes Mono e Flux.

Primeiro, quero explicar um pouco do que se tratam. Um paralelo que podemos fazer é que o Mono possui uma abstração similar à classe Optional <T> que temos no Java, e o Flux se aproxima mais de uma List<T>. Ambos são implementações criadas no Project Reactor da interface Publisher da especificação Reactive Streams.

Com Mono, somos capazes de emitir apenas 1 item no máximo (podemos não emitir nada, também). Com Flux, podemos emitir 0..N itens (ou seja, um fluxo de itens). As duas implementações compartilham de quase todos os métodos com apenas algumas diferenças na implementação já que com Mono estamos trabalhando sempre com uma única entidade no máximo e com Flux temos um formato parecido com uma lista.

Mono | Flux

Criando Mono e Flux

Existem algumas formas de criarmos um Publisher que contenham itens para ele emitir. Vamos começar vendo como fazer com Mono:

Criando Mono

No código acima, demonstro 4 formas de criar um Mono. Podemos utilizar o método just, que emitirá “apenas” o conteúdo passado por parâmetro. Temos também o empty, que cria um Publisher que não emite eventos, por não ter nenhum, o justOrEmpty, que acaba sendo uma junção dos 2 primeiros métodos, onde ele pode ter um conteúdo passado por parâmetro ou não. E por fim temos o defer que possui uma sintaxe diferente, onde precisamos fornecer um Mono para ele.

É importante entendermos a diferença entre o just e o defer, por isso mostro o resultado da execução do código:

Log do código de criação de Mono

Na linha 17 e 19, eu estou me inscrevendo no mesmo Publisher, e pedindo para que a cada evento emitido eu faça um System.out.println do conteúdo. Note nas mensagens que as duas subscrições para o Mono criado com o just possuem o mesmo valor, e para o Mono criado com o defer temos horários diferentes. A razão disso é porque no just, interpretamos o valor passado por parâmetro no momento da sua criação, assumindo um comportamento conhecido como Eager.

Com o defer, temos um comportamento diferente, onde para cada subscrição temos um horário diferente, pois o método produtor passado para o defer irá apenas ser executado quando for realizado uma subscrição, e a cada nova subscrição o produtor será executado novamente. Esse comportamento é conhecido como Lazy.

Sobre a última mensagem na log, como o Publisher não tinha nenhum item para emitir, não chegamos no código responsável por mostrar na log o item emitido. Com Flux:

Criando Flux

Aqui vou listar as diferenças que temos em relação ao Mono. O método just, pode receber 1..N eventos, uma vez que não possui a limitação de emitir um único item como o Mono, e o Flux não possui o método justOrEmpty. De resto, o comportamento é o mesmo do Mono.

Log do código de criação de Flux

Utilizando doOnNext e doOnComplete

Caso precisemos realizar alguma ação após recebermos o item, podemos utilizar o doOnNext, que executará uma ação passada como callback. Emitimos apenas o item Olá no exemplo abaixo, mas podemos concatenar várias ações a serem executadas a partir da emissão do mesmo item.

doOnNext com Mono
Log doOnNext com Mono

Apenas no Flux temos o método doOnComplete, que executará uma ação após todos os itens terem sido emitidos:

doOnNext com Flux

Neste exemplo temos 3 itens a serem emitidos. Quantas vezes o doOnNext será executado ao todo? 6 vezes, pois para cada item emitido passamos 2 vezes no doOnNext. E no doOnComplete, passamos 3 vezes? Não, ele irá ser executado apenas uma vez quando o fluxo de dados for emitido por completo.

Um detalhe, é que não temos acesso ao item no doOnComplete, diferente do doOnNext onde temos acesso ao it (item emitido).

Log doOnNext com Flux

Como transformar o Item

map com Mono

Quando precisamos que nosso Item vire um outro objeto podemos utilizar o map, que tem a função de mapear um objeto para outro objeto. No exemplo, criei uma instância da classe Pessoa, criei um Mono que emite essa pessoa e após emitir ele transforma a pessoa em uma instância da classe Funcionário através do map. A partir desse ponto, nosso código subsequente no Mono como o doOnNext vai conhecer o Item como uma instância da classe Funcionário e não mais da classe Pessoa. Na log conseguimos ver o conteúdo do nosso item:

Log do map com Mono
map com Flux

O map do Flux funciona como o do Mono, a única diferença sendo que ele irá passar no map para cada item emitido, enquanto no Mono o map só poderá ser executado uma vez.

Como transformar o Item em um novo Mono ou Flux

Vão existir casos onde a partir de um item vamos executar algum método que nos retorne um novo Mono/Flux. Quando isso acontecer, devemos utilizar o flatMap no lugar do map. O map serve para os casos onde conseguimos realizar a transformação do item com os dados disponíveis no momento, já com o flatMap caso exista alguma operação de I/O por exemplo que deve buscar algum dado fora da nossa aplicação, nós queremos que essa operação seja não bloqueante, portanto é criado um novo Publisher.

flatMap com Mono

O método findById da classe PessoaRepository retorna um Mono<Pessoa>, o que significa que ao utilizar o método map, estamos mapeando um Long (idPessoa) para um Mono<Pessoa>, fazendo com que o retorno vire um Mono<Mono<Pessoa>>, conforme a imagem acima. Para evitar esse Mono encapsulando Mono nós “achatamos” (flat) ele utilizando o flatMap. Podemos entender que caso nosso retorno seja um Publisher, devemos utilizar o flatMap.

flatMap com Flux

Mesma coisa com o Flux. No exemplo chamo o método findAll que retorna um Flux<Pessoa>, por isso se não achatarmos ele com o flatMap, teremos um Flux<Flux<Pessoa>>.

Trabalhando com Empty

E quando nosso fonte de dados não emite nenhum item? Por exemplo, quando executamos um find no Banco de Dados mas não existe nenhum registro com aquele Id? Nessas situações podemos trabalhar com o Empty:

empty com Mono

No primeiro cenário (Linha 65–71), temos um Mono que emite um item contendo um ID de uma Pessoa, e como próximo passo no flatMap pedimos para o nosso Cache recuperar a Pessoa, porém, nem sempre a informação vai estar no Cache, então temos que tratar esse cenário. Uma das formas é utilizando o switchIfEmpty, onde a ideia é “trocar” (switch) a origem do dado caso o processo anterior não tenha emitido nenhum item. Na linha 69, coloco a chamada para o nosso Banco de Dados (sem cache) em um defer, pois o switchIfEmpty é Eager, o que significa que caso nós não tivéssemos utilizado o defer, a busca no Banco de Dados teria sido realizada mesmo se um item tiver sido emitido da busca no Cache, por isso fiquem bem atentos a esse ponto.

O defaultIfEmpty segue a mesma ideia, porém em vez de trocar a origem do dado, a sua responsabilidade é retornar um valor por “padrão” (default). No caso, retorno uma instância default da classe Pessoa.

empty com Flux

Com o Flux a diferença é que tanto o switchIfEmpty como o defaultIfEmpty ocorre por item emitido, ou seja, podemos ter alguns itens emitidos pelo cache e nesse caso não executaríamos os códigos que tratam o Empty, mas também podemos ter alguns itens que não tiveram retorno do Cache, e nesse caso seriam executados os “fallbacks” que cuidam dos cenários de Empty.

Convertendo Mono -> Flux e Flux -> Mono

Se o item emitido dentro de um Mono for uma lista, pode ser mais vantajoso em alguns cenários que realizemos a conversão de um Mono<List<T>> para um Flux<T>. Abaixo mostro 2 formas de como fazer isso:

Convertendo Mono

Na primeira maneira, temos um Mono<List<Pessoa>> e através do método flatMapMany retornamos um novo Publisher (Flux) a partir de um item iterável através do método fromIterable. Na segunda forma, utilizando o flatMapIterable, podemos abstrair do código o método fromIterable e apenas retornar um item iterável que ele será transformado em um Flux.

Convertendo Flux

O Flux possui algumas opções diferentes para quando quisermos convertê-lo para Mono. O método collectList tem a responsabilidade de pegar o fluxo de itens e transformá-lo em uma lista de item para o Mono, retornando um Mono<List<Pessoa>> no exemplo. Usando o next, capturamos o próximo item do fluxo na forma de um Mono, e o last nos retorna o último item do fluxo. O reduce nos permite manipular o retorno ao combinar itens do fluxo em um único resultado dentro de um Mono.

Criando e Tratando Exceções

Ao invés de lançarmos uma exceção no código utlizando o throw, podemos encapsular nossas exceções dentro de um Mono.error, por exemplo. Com o Mono ou Flux contendo uma exceção dentro de um item, conseguimos executar alguns métodos específicos para controlarmos o fluxo de exceção.

Tratando exceções com Mono

Criamos um Mono contendo um funcionário e após isso fizemos um flatMap que retornou um novo item contendo uma exceção. Quando isso ocorre podemos utilizar alguns métodos como o doOnError que é similar ao doOnNext, porém ele é executado quando recebe uma exceção e também acesso à exceção lançada. O onErrorResume espera um retorno contendo um novo Publisher para poder resumir o fluxo. Os dois métodos doOnError e onErrorResume estão também disponíveis para o Flux.

Existem alguns outros métodos também responsáveis por tratar cenários de exceção, como o onErrorContinue, onErrorMap, onErrorReturn, onErrorStop, mas não irei me aprofundar tanto pois acredito que eles merecem um tópico próprio.

Conclusão

Vimos aqui apenas um pouco do poder disponível dentro do Mono e do Flux, existem diversos métodos pelo qual acabei não passando pois o conteúdo ficaria muito extenso. A ideia dos exemplos e das explicações é ter um lugar para consultar quando precisarmos realizar alguma implementação utilizando o Project Reactor e nos ajudar a entender melhor. Ah, segue link do Repositório: https://github.com/devspoint/projectreactor-examples.

Mais pra frente, pretendemos trazer uma aplicação utilizando o WebFlux e mostrar um pouco do seu funcionamento na prática. Espero que tenham gostado :)

--

--