RabbitMQ — Parte II

iundarigun
Dev Cave
Published in
10 min readJan 15, 2020

Vamos continuar falando um pouco sobre Rabbit, desta vez (um pouco mais) avançado. Se você não leu a primeira parte, acessa ela clicando aqui.

Na primeira parte, não nos preocupamos muito sobre os parâmetros de criação tanto das filas como dos exchanges. Ainda comentamos que não é necessário declarar um bean para criar filas e exchanges, pois podemos usar a ferramenta administrativa. Vamos começar por esses dois pontos.

RabbitAdmin

Como exposto no post anterior, quando a aplicação inicia, o starter do Rabbit busca aqueles beans de filas, exchanges e bindings para fazer a criação mas se temos muitas filas, parece que estaremos mantendo uma grande quantidade de beans em memória que só serão usados para criar filas no inicio da aplicação. Por se esse motivo não for suficiente para procurar uma alternativa, recentemente trabalhei numa funcionalidade onde o listener precisava ser criado dinamicamente em função de um job, mas declarar o bean de uma fila dinamicamente não a criava no Rabbit.

Usar a ferramenta administrativa como vimos na primeira parte do post, pode ser uma opção, mas as vezes não temos acesso a ela. Uma alternativa é usar o RabbitAdmin, uma classe da lib do spring-rabbit. Ele será o encarregado de comunicar com o Rabbit declarando filas, exchanges e bindings. Pode ver o projeto rabbitmq-producer-advanced no mesmo repositório da primeira parte:

Usamos o connectionFactory para criar o RabbitAdmin, a partir de ai declaramos os diferentes objetos

Uma vez executado o método createRabbitElements() as filas, exchange e bindings são criados mas não há nenhum bean desnecessário em memória.

Customizando as filas

As filas e os exchanges podem ter caraterísticas bem variadas em função dos parâmetros fornecidos. Podemos ter uma ideia dessas opções se for criar a fila a través da ferramenta administrativa:

Clicando nos ? aparece um pop-up com a explicação do parâmetro.

As opções mais significativas na criação de filas são:

  • Durable: Se a fila é persistida ou não. Caso afirmativo, garante que o RabbitMQ nunca vai perder a fila.
  • Lazy mode: Mantém o máximo de mensagens possível no disco para reduzir o uso da RAM. Isso reduz a throughput mas a performance geral do Rabbit não é afetada por uma fila grande de consumo lento que deixa mensagens demais na memória.
  • Message TTL: O tempo que uma mensagem publicada na fila pode permanecer ai antes de ser descartada.
  • Auto-delete: Caso afirmativo, a fila será excluída após todos os consumers serem desconectados.
  • Auto-expire: O tempo que uma fila pode estar sem ser usada antes de ser excluída automaticamente. Pode ter elementos, mas se não tiver nenhum consumer, será deletada.
  • Max length: Quantas mensagens não consumidas uma fila pode manter. Em função do parâmetro queue overflow behaviour, uma vez atingido o máximo, o RabbitMQ vai começar a descartar mensagens iniciando pelas mais antigas, ou vai rejeitar novas mensagens.
  • Dead letter exchange: Exchange usado para envio de mensagens rejeitadas ou expiradas.
  • Dead letter routing key: Routing key usada para o envio de mensagens rejeitadas ou expiradas.

No QueueBuilder podemos especificar (quase) todos esses parâmetros adicionais:

Dead Letter Queue

Uma das opções que configuramos na fila foi um exchange e um routing key para a DLQ. A DLQ, acrônimo de Dead Letter Queue, é uma fila usada como um “cemitério” de mensagens que por um motivo ou outro não foram processadas com sucesso. Essas mensagens podem ser avaliadas manualmente e caso o processamento não tenha acontecido por algum bug no código, podemos reenfilerar essas mensagens uma vez corrigido o problema e não perder nenhuma informação.

Para o Rabbit encaminhar as mensagens para esse exhange, precisamos retornar uma exception especifica no consumer. Vamos abrir o projeto rabbitmq-consumer-advanced.

O código é o seguinte:

Decidi usar o MessageListenerContainer, mas funcionaria igual com a annotation. Basicamente, se certa informação está null, o consumidor vai lançar uma exception do tipo AmqpRejectAndDontRequeueException. Isso vai fazer o Rabbit verificar se existe algum tipo de mecanismo de DLQ. Como a fila foi configurada para ter DLQ, a mensagem é postada no exchange:

Retentativas

As vezes, o consumo da mensagem da um problema por conta de uma instabilidade de outro sistema. Justamente o uso das fila favorece esse isolamento. Imagina então que precisamos mandar via http uma requisição POST para uma URL externa. Caso a requisição retorne algum erro (pode ser timeout, serviço indisponível, etc) estaremos mandando uma mensagem para a fila DLQ que com um pouco mais de “insistência” teria sido processada com sucesso. E aqui entram as políticas de retentativa.

Vamos criar uma política de retentativa com as seguintes caraterísticas:

  • 5 tentativas no máximo
  • Após a primeira tentativa, esperar 1 segundo antes de tentar de novo
  • A cada tentativa nova, dobrar o tempo de espera
  • No máximo, uma espera de 6 segundos

Basicamente, criamos uma política de retry e adicionamos ela na AdviceChain do listener.

Se rodar o consumer e o producer, teremos o seguinte resultado:

Há um problema nesta abordagem. Uma vez termina, a mensagem não é postada na DLQ. Para fazer isso, precisamos acrescentar um Recoverer na política de retry.

No caso, usamos um recoverer do Spring que basicamente propaga a exception RejectAndDontRequeueException que usávamos no listener. Isso significa que qualquer exception será enviada para a DLQ, o que pode ser muito aconselhável!

Existe outro recoverer fornecido pelo Spring que re-posta a mensagem numa fila, RepublishMessageRecoverer, que recebe no construtor um rabbitTemplate, o exchange e o routing key. O problema é que podemos entrar num loop infinito.

Também podemos implementar nosso próprio recoverer criando uma classe que implemente a interface MessageRecoverer para, por exemplo, descartar as mensagens caso aconteça alguma exception específica ou tomar uma decisão em função das vezes que já foi retentado.

Só um aviso sobre o retry: Se o volume de mensagens for alto, e número de erros também, seu consumidor pode virar um gargalo, porque enquanto rola os retries, aquele consumidor fica travado, então pense bem antes de implementar um número alto de retry.

Confirmação manual

Até agora não nos preocupamos de como é feita a confirmação de leitura de uma mensagem, mas dependendo do cenário isso pode ser crítico, pois perder uma mensagem pode ter impacto financeiro. Nesses casos, pode ser aconselhável fazer uma confirmação manual. Para isso, criei um listener, desta vez com annotation (mas, de novo, daria para usar o MessageListenerContainer), e marquei ele como confirmação manual. Vejam o código abaixo.

Tem vários pontos de alerta.

  1. Na linha 10 temos a confirmação negativa. Reparar no último parâmetro. Se refere à re-enfileirar a mensagem. As vezes pode ser interessante pedir para re-enfileirar, mas cuidado com criar um loop infinito.
  2. Caso ser false, o Rabbit tentará mandar para a DLQ se estiver configurada.
  3. Na linha 13 temos a confirmação positiva. Caso não acontecer nenhuma confirmação, nem positiva ou negativa, a mensagem fica pressa na aplicação, o que deve ser evitado.

Concorrência

Só passando rápido por este ponto, podemos configurar o número de consumidores concorrentes para cada listener. Isso é importante quando temos um input alto nas filas e queremos que o consumo consiga dar vazão ao volume.

Para configurar esse valor podemos usar o parâmetro concurrency tanto quando estiver usando annotation como MessageListenerContainer. Nos dois casos aceita o formato “x-y”, onde o primeiro parâmetro é o número de consumidores iniciais e o segundo é número máximo de consumidores. Caso for um número diferente, o segundo deve ser maior que o primeiro e quando o volume for muito grande, a aplicação irá aumentando os consumidores plugados no Rabbit. Se são iguais, da para só especificar um número.

Virtual Host

Em todos os exemplos usados até agora usamos as configurações default do Rabbit (usuário, senha, etc), mas até que ponto isso está certo? Para modificar esse cenário e deixá-lo mais production ready vamos customizar essas informações.

Mas o que é um Virtual Host? Também chamado de vhost, é uma forma de segregar aplicativos que estão usando a mesma instância ou cluster de RabbitMQ. Usuários diferentes podem ter privilégios de acesso diferentes para diferentes vhosts e filas, possibilitando o isolamento entre aplicações. Cada fila, exchange, binding, cada mensagem publicada, será única, criada e publicada especificamente para aquele vhost. Também pode ser usado para isolamento de ambientes, produção, homologação. Em qualquer caso, o isolamento é sempre lógico e os recursos físicos são compartilhados, então precisamos levar em consideração se o volume dos diferentes vhosts não é notável ao ponto de ter impacto na performance. Fazendo uma analogia, é como ter um banco de dados com vários schemas dentro. Tem isolamento lógico, mas compartilhamento de recursos físicos.

Entramos na ferramenta administrativa onde tem uma aba chamada “Admin”. É uma boa prática trocar a senha do usuário guest ou até criar um usuário administator novo e criar vhosts para nossas aplicações:

Vamos criar um vhost novo, um usuário novo e dar permissão de acesso para esse usuário a través dos seguintes passos:

  1. Criamos o usuário devcave com a tag management para poder logar na UI com esse usuário

2. Criamos o vhost rabbit-advanced

3. Nos detalhes do vhost criado, adicionamos as permissões para o usuário devcave.

Para configurar o uso desse vhost, basta alterar o application.yml para refletir as informações desejadas:

spring:
rabbitmq:
host: localhost
port: 5672
username: devcave
password: devcave
virtual-host: rabbit-advanced

O resto fica igual, o Spring vai entender a configuração e vai usá-la para criar o ConnectionFactory.

Lidando com duas conexões

As vezes, precisamos conectar uma aplicação a dois Rabbits ou dois vhosts diferentes e operar neles ao mesmo tempo. Mas, como gerenciar isso com Spring? Até agora, ele foi criando as coisas para nós. Agora precisaremos alguma coisa mais manual. Pode abrir o projeto rabbit-producer-multiple-vhost do mesmo repositório.

Primeiro, acrescentamos no application.yml a configuração do segundo Rabbit:

spring:
rabbitmq:
host: localhost
port: 5672
username: devcave
password: devcave
virtual-host: rabbit-advanced
alternative-host: localhost
alternative-port: 5672
alternative-username: guest
alternative-password: guest
alternative-virtual-host: /

A continuação precisamos recuperar essas propriedades para criar dois connectionFactory e dois RabbitTemplate.

Duas perguntas e respostas. A primeira é: Porque precisamos criar um connectionFactory com os parâmetros “padrão” se o Spring já fazia isso? A resposta é simples: O Spring cria o connectionFactory usando as propriedades padrão (as que usamos nos outros projetos) sempre que não achar outro bean do tipo connectionFactory criado. Podemos ver isso na classe RabbitAutoConfiguration dentro da lib spring-boot-autoconfigure:

A classe RabbitAutoConfiguration só é configurada se existe a classe RabbitTemplate e Channel no classpath. E o classe RabbitConnectionFactoryCreator só é configurada se não existir o bean ConnectionFactory. Desde o momento que estamos criando um bean desse tipo, a classe não é “configurada” e por isso precisamos criar os dois connectionFactory.

A segunda pergunta é: Porque precisamos marcar um bean como primary? Precisamos isso porque esses beans são usados para criar outros beans. Quando não marcamos um como primary, o Spring não sabe qual deve injetar, pois encontra dois “candidatos” do mesmo tipo.

O grande Júlio Falbo criou uma lib na empresa dele que auxilia no uso de vários vhosts, entre outras coisas. Estou deixando aqui o link do medium com a explicação do uso.

Spring Cloud Streams

Gostaria de poder falar um pouco sobre o uso do Spring Cloud Streams e Rabbit, porém meu conhecimento sobre o assunto é limitado e o post está ficando enorme. Criei um projeto no mesmo repositório, pasta rabbit-cloud-stream que cria dois producers usando o mesmo exchange com bindings para filas diferentes, e um consumer para uma das duas filas.

Clustering e Sharding

Por último, é bastante comum manter o Rabbit em cluster para manter alta disponibilidade e replicação de dados. Acho que em produção deveria ser mandatório. Mas existe a necessidade de ir além para cenários onde o volume é muito absurdo. E aqui é onde entra o Sharding.

  • Sharding is a method for distributing data across multiple machines.

Basicamente, os dados e as filas ficam distribuídas e não (necessariamente) replicadas. Nunca trabalhei neste modelo com Rabbit, então não consigo listar problemas ou alertas, mas existem dois plugins que auxiliam nisso. Deixo os links aqui para se tiver interesse:

Conclusão

Bom, isso é o que eu tinha para compartilhar com vocês sobre Rabbit, e como usá-lo com Kotlin (ou Java) e Spring Boot.

Se chegou até aqui, parabéns pois ficou maior do que desejava, mas espero que possa ajudar alguém no seu dia a dia ou no seus estudos. E se detectou alguma bobeira ou erro no post, peço desculpas, pois não foi proposital. Comenta aqui para eu aprender também!

Algumas referências

--

--

iundarigun
Dev Cave

Java and Kotlin software engineer at Clearpay