Rabbit MQ — Parte I

iundarigun
Dev Cave
Published in
8 min readJan 9, 2020

Recentemente, tive a oportunidade de voltar a trabalhar com RabbitMQ. Embora nunca parei de usar algum tipo de mensageria, fazia um tempo que não precisava começar do zero com Rabbit.

Tentei começar um pouco do zero e decidi fazer dois posts sobre Rabbit. Este primeiro será mais básico, com um pouco de teoria e um pouco de prática usando Spring Boot e Kotlin, e o segundo pretendo que seja mais avançado, com algumas configurações que podem ser úteis em determinados cenários.

RabbitMQ

Em poucas palavras, RabbitMQ é um software open source de mensageria. Fornece uma forma comunicação assíncrona de dados entre processos, aplicações ou servidores. É um dos brokers de mensagens mais utilizados e implementa o protocolo AMQPAdvanced Message Queueing Protocol.

Como funciona

Como toda comunicação, precisamos de um produtor, de uma mensagem e de um receptor. No meio do bolo fica o RabbitMQ, que seria o lugar onde fica a mensagem esperando pelo receptor.

Um conceito importante para evitar dores de cabeça lá na frente é que não enviamos a mensagem para uma fila diretamente e sim para um exchange, que será o encarregado de encaminhar para a fila correta. E a mágica para entender o Rabbit está em entender como funciona esse encaminhamento, esse routing.

Antes de explicar melhor o que o exchange faz, precisamos esclarecer três conceitos:

  • Binding: É a ligação entre uma fila e um exchange
  • Binding key: É uma chave específica da ligação entre a fila e o exchange
  • Routing key: É uma chave enviada junto a mensagem que o exchange usa para decidir para que fila (ou filas) vai rotear uma mensagem.

Exchanges

Como já comentei, as mensagens não são publicadas diretamente numa fila. O produtor (producer) envia a mensagem para o exchange, junto com o routing key (não é obrigatório) e o exchange encaminha para as filas em função da configuração.

Temos quatro tipos de exchange:

  • Direct: O Direct Exchange encaminha as mensagens procurando por um binding key igual ao routing key fornecido. Vale salientar que se o exchange não consegue encaminhar a mensagem para uma fila, ela será descartada
As mensagens são encaminhadas em função do routing key.
  • Topic: Topic Exchange funciona de uma forma parecida ao Direct, porém o binding key é um tipo de “expressão regular” aplicada sobre o routing key. Para ser mais fácil de entender, poderíamos fazer uma analogia com as tags do post. O routing key da mensagem seria a lista de tags separadas por pontos (por exemplo “kotlin.rabbitmq.tecnologia”). Os bindings poderiam filtrar por tipo, e cada fila só receberia aqueles posts que acharem interessantes, por exemplo “#.kotlin.#” ou “#.tecnologia.#”.
  • Fanout: O Fanout Exchange encaminha para todas as filas com binding nele, desconsiderando routing key.
Toda mensagem é encaminhada para as filas
  • Headers: O Headers Exchange usa os valores do header da mensagem para fazer o encaminhamento, ignorando o routing key. É parecido ao Topic mas permite um controle mais fino. A mensagem pode dizer se os headers precisam ser iguais à binging key ou se só um header igual já é motivo suficiente para encaminhar. Sendo sincero, nunca vi usando este tipo de exchange.

Rodando RabbitMQ em local

Bom, por hoje já houve bastante teoria, então vamos pôr as mãos na massa. O primeiro que precisamos é um Rabbit. Temos algumas opções no cloud (por exemplo https://www.cloudamqp.com) mas para brincar podemos iniciar um rabbit em local. E quem acompanha o blog sabe que gosto de usar o docker, então:

$ docker run -d --hostname localhost --name local-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

No caso usamos uma imagem que já tem o plugin de UI ativo, que fornece uma interface gráfica para gerenciar o broker. Desta forma, uma vez o docker terminar de iniciar, podemos acessar a url http://localhost:15672 e logar com o usuário guest e senha guest.

Podem brincar criando Exchanges e filas e fazendo os bindings entre filas e exchanges. Desde dentro um exchange conseguimos “enviar” uma mensagem junto com o seu routing key. Se quiser brincar de um jeito mais visual, pode usar o site http://tryrabbitmq.com/, que usei para gravar os Gifs.

Spring Boot e RabbitMQ

Criei dois projetos no lugar de sempre https://start.spring.io, selecionando Spring for RabbitMQ. Para ver o código (é Kotlin, mas a tradução para Java é bem simples), pode fazer checkout no seguinte repositório:

> git clone https://github.com/iundarigun/learning-rabbitmq

Para este post, vamos focar nas pastas rabbit-producer-basic e rabbit-consumer-basic.

Producer

Se abrimos o projeto, separei os arquivos de configurações de filas em três:

No QueueConfig há duas filas declaradas, usando o QueueBuilder, mas podemos usar o construtor do Queue e teria o mesmo resultado:

Nos DirectConfig e FanoutConfig temos os exchanges e os bindings declarados:

Repare que nos bindings do fanout não há routing-key definida

Fora isso, no application.yml definimos as propriedades de acesso ao Rabbit. Na verdade, os valores são os default, mas preferi especificá-los:

spring:
application:
name: rabbitmq-producer-basic
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest

No método postOnExchange do ExchangeController, basicamente recebemos um exchange e um routing key, além de uma String com a mensagem e usamos o rabbitTemplate para enviá-lo para ̶a̶ ̶f̶i̶l̶a̶ o exchange. Os parâmetros necessários para mandar a mensagem são precisamente o exhange, o routing key e a mensagem:

Antes de explicar mais nada, vamos subir a aplicação e vamos executar os seguintes comandos:

$ curl -X POST "http://localhost:8001/exchanges/DIRECT-EXCHANGE-BASIC/TO-FIRST-QUEUE" -H "accept: */*" -H "Content-Type: application/json" -d "12345"
$ curl -X POST "http://localhost:8001/exchanges/DIRECT-EXCHANGE-BASIC/TO-SECOND-QUEUE" -H "accept: */*" -H "Content-Type: application/json" -d "12345"
$ curl -X POST "http://localhost:8001/exchanges/FANOUT-EXCHANGE-BASIC/ANY" -H "accept: */*" -H "Content-Type: application/json" -d "12345"

Enviamos duas mensagens para o direct exchange e uma para o fanout. O resultado é que deveríamos ter duas mensagens em cada fila, pois o fanout vai distribuir a mensagem para as duas filas. Vamos verificar no Rabbit:

Entendendo a mágica

O que aconteceu? Vamos dar um passo atrás e analisar com calma.

  1. A aplicação subiu e magicamente as filas e os exchanges foram criados
  2. Usamos o rabbitTemplate e não configuramos nada
  3. Aparentemente não precisamos dos objetos Queue, Exchange ou Binding para mandar elementos para a fila.

Explicando o primeiro ponto, quando a aplicação inicia, o Spring explora os beans de Queue, Exchange e Binding declarados e cria eles no Rabbit.
O RabbitTemplate é criado e injetado no contexto, então só precisamos usá-lo.
Sobre o terceiro ponto, realmente é um desperdício de recursos manter os beans se só são usados para criar as filas, exchanges e bindings. Inclusive, se eles já estão criados no Rabbit (seja via API, outra aplicação, via UI) nem precisamos declarar esses beans. Faça o teste, depois de rodar a aplicação a primeira vez, comenta a annotation Configuration dos arquivos de configuração, inicia de novo a aplicação e tenta de novo. Funcionou, não é?

Dois últimos pontos sobre o producer:

  • Se tentamos enviar para um exchange inexistente, não vamos obter uma exception, embora teremos um log de error. Isso é porque o envio é assíncrono, não acontece na thread principal.
  • Se especificamos um routing key inexistente, a mensagem vai chegar no Rabbit e será simplesmente descartada.

Consumers

O projeto de consumer é até mais simples. Coloquei dois exemplos sobre como configurar o listener. O primeiro exemplo é declarando uma fila e um listener. Isso da um controle mais manual. O segundo é usando uma annotation:

Começando pela annotation, pois é mais simples, especificamos qual é o nome da fila e simplesmente recebe um message (podemos receber o conteúdo diretamente se preferir, trocando o tipo de parâmetro para String neste caso).

Já quando criamos o listener na mão, criamos um listenerContainer e especificamos qual é a classe que vai receber a mensagem. Há uma classe no pacote service que herda de MessageListener e implementa o método onMessage:

Outro detalhe importante é o uso do SimpleRabbitListenerContainerFactory para repassar os Advice. O motivo é que isso é necessário para o sleuth conseguir recuperar o traceId do header do message para usar no log. Se esta última frase te parece outro idioma, pode ler neste link sobre o sleuth

Mensagens de verdade

Até agora, nos exemplos trafegamos só texto, mas num caso mais real teremos um objeto mais complexo. Então, vamos ver um caso diferente. Começamos com o producer.

Precisamos primeiro um objeto complexo, então no pacote domain criei um objeto mais complexo, com campos nullables, datas, texto e numéricos.

Agora precisamos dizer ao Rabbit da nossa aplicação que tipo de mensagem queremos enviar. Uma opção bem comum é enviar um json, pois facilita a leitura caso precisar rastrear a mensagem no Rabbit. Vamos criar um bean novo especificando um MessageConverter:

Nota: O objectMapper é necessário para trabalhar com data class de Kotlin e com datas de java8

O que fazemos com ele? NADA. O Spring vai achar o bean e passar usar este converter no momento de enviar a mensagem para o Rabbit. Se olhar no código, há uma terceira fila e um Binding novo para o Direct Exchange.

Tem um endpoint para postar. Inicie o projeto e execute o seguinte código:

$ curl -X POST "http://localhost:8001/exchanges/json/DIRECT-EXCHANGE-BASIC/TO-JSON-QUEUE" -H "accept: */*" -H "Content-Type: application/json" -d "{ \"name\": \"Iundarigun\", \"collageCompletedYear\": 2005, \"bornAt\": \"1980-08-07\", \"active\": true}"

Se olhamos no RabbitMQ, encontraremos uma mensagem na fila no formato Json:

Vamos agora fazer as alterações no consumer.

Primeiramente, precisamos criar o mesmo bean do MessageConverter que criamos no producer. Também criamos a classe Person no pacote domain, igualzinha que no producer. E por último, criamos o listener. Criei por annotation mas poderia ser criado com o MessageListenerContainer, é indiferente.

Agora, se rodar a aplicação, a mensagem que postamos com o producer é consumida:

Bom, até aqui chegamos neste primeiro post. O segundo será um pouco mais avançado.

O que fica para a parte II?

O que fica para o próximo então?

  • Configuração (um pouco) avançada de Rabbit
  • Dead Letter Queue ou onde vão as mensagens que não são processadas
  • Como criar as filas, exchanges e bindings sem precisar declarar um bean.
  • Confirmação manual de mensagem processada.
  • Políticas de rentativas de processamento
  • E algumas coisas mais!

Algumas referências

--

--

iundarigun
Dev Cave

Java and Kotlin software engineer at Clearpay