Iniciando na programação reativa com ReactiveX

Rodrigo Santos
#EmpiricusTech
Published in
7 min readFeb 12, 2021

Primeiro contato

Em 2017 nós recebemos aqui na Empiricus o código fonte de um fornecedor que utilizava a arquitetura MVVM (model-view-view-model). Eu já havia tido experiência com este modelo no Windows Phone, e ele estava se tornando cada vez mais popular no ecossistema Android. Ao me aprofundar na arquitetura e implementação do código, eu percebi que ambos eram fortemente baseados no ReactiveX, tecnologia que para mim era novidade.

Escrevi essa introdução para contextualizar o meu início de aprendizado nas tecnologias de programação reativa, cujo progresso foi inicialmente difícil. Na época não consegui encontrar nenhuma documentação ou tutorial que explicasse os fundamentos de uma forma simplificada. Também foi custoso entender como trocar a maneira imperativa de pensar desenvolvimento de software pela maneira reativa - e esse shift de paradigma será o foco principal deste primeiro artigo.

O que é? Por que usar?

O ReactiveX é uma biblioteca que possui implementações em diversas linguagens, cujo objetivo é adicionar capacidades reativas à linguagem escolhida.

Explicando da forma mais simples possível, ela serve para nos poupar esforço simplificando a árdua tarefa que é desenvolver e manter código assíncrono.

Principais componentes

Os principais componentes que devemos compreender para dominar o ReactiveX são: Observables, Observers, Schedulers e Operators, veremos cada um em detalhes mais abaixo. É importante citar que outras implementações podem variar um pouco nas terminologias, mas os conceitos serão semelhantes.

Observables

Eles são a parte principal do ReactiveX, são basicamente objetos que emitem dados ao longo de uma linha do tempo podendo finalizar em sucesso ou em erro. Normalmente eles são finitos, mas podem ter a mesma duração que a aplicação também.

Abaixo um exemplo simples de um Observable que emite três Strings e encerra o fluxo:

Diagrama Marble de um observable simples
Output do programa com o observable simples

É um pouco difícil de visualizar em um primeiro instante, mas um Observable é similar por exemplo aos ResultSet, InputStream, OutputStream, Scanner, PrintWriter, entre outros, que são objetos utilitários de operações I/O da plataforma Java. Inclusive um uso bem comum do ReactiveX é “embrulhar” operações I/O realizadas em Observables para deixar mais legível para o código cliente e simplificar o paralelismo.

Abaixo um exemplo de código de um Observable que lê e emite cada linha do arquivo.

Diagrama Marble resumido para um observable de leitura de um arquivo
Output do programa do observable de leitura de arquivo

É importante finalizar o fluxo assim que o Observable tenha acabado o seu trabalho, para que os recursos possam ser liberados de forma adequada. Para sinalizar o encerramento do fluxo basta utilizar a instrução emitter.onComplete().

Tão importante quanto sinalizar o encerramento de um fluxo, é sinalizar quando ele encontra um erro, como no exemplo abaixo:

Diagrama Marble de um observable com erro
Output do programa do observable de leitura de arquivo com erro

Após o Observable emitir o sinal de finalização ou de erro ele não pode mais emitir novos dados e é considerado finalizado.

Observers

Eles são os objetos que consomem o fluxo de dados emitido pelos Observables. Quando um Observer está escutando um Observable, dizemos que o Observer “se inscreveu” (subscribe) no Observable.

Um conceito bem similar são os Listeners que adicionamos aos componentes na tela, como por exemplo botões para obter os eventos. Assim como os Listeners de interface, o Observer permanece inoperante até que ocorra a sua inscrição (subscribe) no Observable.

No primeiro exemplo de Observable propositalmente deixei de forma bem camuflada o Observer, mas agora reescrevo o código para deixar de forma mais explícita a sua criação. Neste próximo exemplo é possível entender melhor o que é o Observer, e que é ele o responsável por exibir o texto em tela (e não o Observable!).

Note neste exemplo que existem 4 métodos que podemos implementar no Observer, que são:

  • onNext — recebe os dados emitidos pelo Observable.
  • onComplete — recebe a sinalização de final de fluxo do Observable.
  • onError — recebe a sinalização de erro do Observable.
  • onSubscribe — Essa é uma implementação um pouco mais específica do RXJava, mas ele fornece uma referência ao objeto de subscrição que é o resultado da inscrição do Observer ao Observable, e que pode ser usado para encerrar a inscrição prematuramente caso desejado.

Schedulers

Os exemplos anteriores conseguem exemplificar bem o uso dos Observers e Observables, mas eles não demonstram uma das principais motivações do uso da programação reativa, que é a simplificação de código assíncrono. Considere o exemplo a seguir:

Output do programa com 2 observables sem scheduler

Não importa quantas vezes ele seja executado, o resultado sempre será o mesmo com os números sendo exibidos de 1 a 10. Isto ocorre porque não especificamos qual Scheduler será utilizado para as operações, o que basicamente fará com o que o código seja executado na Thread atual(main) na ordem que ele foi escrito.

Para que o código seja executado de forma paralela vamos especificar os Schedulers nessa nova implementação:

Output do programa com 2 observables com schedulers diferentes

Nesse novo trecho, configuramos os Observables com:

  • subscribeOn — O Scheduler utilizado aqui será usado para executar o código do Observable.
  • observeOn — O Scheduler utilizado aqui será usado para executar o código do Observer.

Nessa alteração utilizamos o Scheduler mais básico, que é o que cria uma nova Thread para cada bloco de execução. Como podemos conferir agora os resultados variam em cada execução como é comum em código multithread.

O Scheduler newThread, embora seja suficiente para situações mais simples, acaba se mostrando ineficiente por criar muitas Threads, podendo exaurir os recursos do sistema, por isso é importante selecionar o Scheduler adequado (ou mesmo criar o seu próprio) para as necessidades da aplicação.

O trecho Thread.sleep(1000) é necessário para dar tempo das outras Threads finalizarem o serviço antes da main encerrar o programa.

Operators

Eles são a parte funcional do ReactiveX, oferecendo capacidades de alteração de dados de forma fluida, similar por exemplo as capacidades que existem no Kotlin e em outras linguagens.

Para quem não possui muita familiaridade com os conceitos, vou explicar alguns dos operadores mais utilizados com exemplos.

Concat

O operador concat consiste em somar os fluxos de dados de dois ou mais Observables. Uma consideração a se fazer é que não deve ser usado em Observables indefinidos, pois o próximo Observable só irá emitir dados após o anterior emitir a notificação onComplete.

Diagrama Marble de dois observers sendo agrupados com o operator concat
Output do programa do operator concat

Map

O operador Map consiste em modificar o fluxo de dados de um observable, transformando um por um os elementos. Abaixo um exemplo de map que transforma o fluxo de Integers em Strings.

Diagrama Marble do operator map em um observable
Output do programa do operator map

Filter

O operador filter como o nome sugere filtra elementos de um fluxo de dados, funcionalmente falando, é gerado um novo Observable que contém apenas os dados que satisfaçam a condição informada. Abaixo um exemplo de filter que elimina números ímpares.

Diagrama Marble se um observable que teve os números ímpares removidos atráves do operator filter
Output do programa do operator filter

Zip

O operator zip junta vários Observables para formar um único assim como o operator concat. As diferenças entre os operadores, é que o observable “zipado” emite dados apenas quando todos os observables que compõem ele também emitiram, adicionalmente diferentemente do concat em que todos os observables devem ter o mesmo tipo, o zip não possui esse requerimento, pois deve ser fornecido uma função transformadora que irá mesclar todos os dados.

Tecnicamente falando, ele é similar a mecanismos que juntam o resultado de um grupo de Threads após o término de todas desse mesmo grupo. O zip é bastante usado para operações de I/O que envolvem muitas chamadas que podem ser feitas de maneira simultânea e que estão relacionadas a um mesmo assunto, como por exemplo pegar dados de um usuário e do endereço dele ao mesmo tempo, como o exemplo abaixo:

Output do programa do operator zip

Por enquanto é isso…

Espero que neste artigo eu tenha conseguido passar uma visão geral de como funciona a tecnologia do ReactiveX, assim como sua aplicabilidade. Não se preocupe se não ficou tão claro ainda os benefícios frente às abordagens mais clássicas, ou se acharem complexa a linha de pensamento. Com o tempo todos esses conceitos novos vão se tornar bem naturais, o que até deixará difícil voltar para as abordagens anteriores.

Num futuro próximo irei lançar mais artigos focados em aplicações reais do ReactiveX, o que espero que ajude ainda mais no entendimento de quem busca mergulhar nesse novo mundo.

--

--

Rodrigo Santos
#EmpiricusTech

Tech Manager Mobile @ Empiricus | Amante da simplicidade nas soluções.