Reprocessamento com RabbitMQ

Rafael Galle
5 min readSep 29, 2020

--

Escrevi este post para ajudar quem precise implementar reprocessamento e reenfileiramento, recentemente tive a necessidade de pensar em algo para tratar um possível problema durante o processamento, mas como fazer isso?

Poderíamos gravar registros no banco de dados, cache, disco ou qualquer outro lugar, utilizar serviços próprios, schedules, crons para executá-los de tempos em tempos, ou poderíamos fazer algo totalmente diferente, são muitas as possibilidades.

A forma escolhida para solucionar esse problema foi utilizar o RabbitMQ, um software de código aberto que aplica o protocolo AMQP (Advanced Message Queuing Protocol) ou Protocolo avançado de enfileiramento de mensagens.

RabbitMQ é uma solução de mensagens que implementa diversos padrões e práticas para melhorar a escalabilidade e interoperabilidade, entre diferentes componentes em um sistema ou entre empresas variadas.(Martin Toshev)

Seu conceito básico é básico:
*Produtor envia mensagem para fila.
*Consumidor consome e processa.

Mas e se esse consumidor tiver um problema ao processar essa mensagem?
O que acontecerá com ela? Será descartada? Voltará pra fila? Como faço para reenfileirar ?😰

Ai que as coisas começam a ficar um pouco complicadas, além das queues precisamos conhecer os conceitos de exchanges, deadLetterExchanges, bindings, confirms, ttls…

Vamos falar um pouco sobre cada um desses itens citados acima, mas para maior entendimento recomendo a leitura desses links aqui: Conceitos, Exchange, DeadLetterExchanges, Bindings, Confirms(ack/nack/reject) e TTL.
O RabbitMQ tem uma documentação fantástica.👌

Exchanges (Trocas)

Exchanges são entidades para onde as mensagens são enviadas. As Exchanges pegam uma mensagem e a encaminham para zero ou mais filas. O algoritmo de roteamento usado depende do tipo de exchange e das regras chamadas bindings, temos quatro tipos de exchanges: Direct, Fanout, Topic e Headers.
Vamos utilizar o tipo “Topic”.

Dead Letter Exchanges — DLX (Troca de cartas mortas)

As mensagens de uma queue podem ser “devolvidas”; ou seja, republicadas em uma exchange quando qualquer um dos seguintes eventos ocorrer:

• A mensagem é confirmada negativamente por um consumidor usando reject ou nack com o parâmetro requeue definido como false.
• A mensagem expira devido ao TTL por mensagem ; ou
• A mensagem é descartada porque a queue excedeu o limite de comprimento

Vamos utilizar a DLX para redirecionar uma mensagem que tiver seu TTL expirado.

Bindings

Bindings são regras que as exchanges usam para rotear mensagens para queues. Para instruir uma exchange ‘A’ a rotear mensagens para uma fila ‘B’, ‘B’ deve ser vinculado a ‘A’. O objetivo da chave de roteamento é selecionar certas mensagens publicadas em uma exchange para serem roteadas para a fila vinculada. Em outras palavras, a chave de roteamento atua como um filtro

Confirm

Dependendo do modo de confirmação usado, o RabbitMQ pode considerar uma mensagem entregue com sucesso imediatamente após ser enviada (gravada em um soquete TCP) ou quando uma confirmação explícita (“manual”) do cliente é recebida. As confirmações enviadas manualmente podem ser positivas ou negativas e usar um dos seguintes métodos de protocolo:

  • ack é usado para reconhecimentos positivos
  • nack é usado para confirmações negativas
  • reject é usado para confirmações negativas, mas tem uma limitação em comparação com nack

TTL (Tempo de vida e expiração)

Uma mensagem que está na fila há mais tempo do que o TTL configurado é considerada morta. O servidor garante que as mensagens mortas não serão entregues para um consumidor da queue, serão descartadas ou redirecionadas para a dead letter queue configurada.

Agora que conhecemos os conceitos básicos podemos desenhar o fluxo de nossas mensagens.

Vamos criar uma Exchange ‘e.work’, uma queue ‘q.work’ e um bind entre ambos.

A Exchange criada é do tipo topic e tem a propriedade durable setada como true. Para mais informações sobre Exchanges consulte esse link.

A queue é criada e tem seu consumidor iniciado com a propriedade noack true, essa propriedade indica se nosso consumidor vai autoconfirmar a mensagem, o ato de confirmar a mensagem indica que ele a recebeu corretamente, e o broker poderá esquecê-la.

Se noack estiver definido como false, o consumidor deve executar ack para confirmar ou nack/reject para indicar que houve um erro com esse processamento, passando como parâmetro a propriedade requeue true/false, indicando se a mensagem deve voltar para a queue ou não.

Opa, com isso já resolvemos nosso problema de reenfileiramento? Nããão.

Ao dar um nack com a propriedade requeue a mensagem volta para a fila e como o processamento é muito rápido, a mensagem é executada quase que instantaneamente, então se ela foi rejeitada inicialmente por algum motivo como por exemplo um serviço com instabilidades ou fora do ar, a mensagem vai ficar reprocessando diversas vezes quase que instantaneamente, gerando um consumo muito alto de recursos do seu servidor.

Para evitar isso, queremos que a mensagem aguarde um determinado tempo para ser executada novamente, então vamos criar uma exchange e uma queue para a espera, crie uma exchange ‘e.wait’, uma fila ‘q.wait’ e um bind entre ambos.

A diferença entre a exchange/queue de wait para a de work, é que a queue wait terá a propriedade deadLetterExchange(DLX) com o valor ‘e.work’ setada. Outra diferença desse fluxo de wait para o fluxo de work, é que a fila ‘q.wait’ não terá um consumidor ativo, a mensagem será publicada na fila ‘q.wait’ e não será consumida até que seu TTL expire, forçando que a mensagem seja redirecionada para a DLX definida.

O que isso significa? Já vimos o conceito de DLX, então se uma mensagem é publicada na queue ‘q.wait’ e não é consumida até que a TTL da mensagem seja expirada, ela será redirecionada para a DLX (troca de cartas mortas) definida, ou seja, a exchange work ‘e.work’ voltando para o processamento inicial.

Belezinha, já temos um fluxo de reprocessamento funcionando, agora só temos que nos preocupar com o número de repetições de reprocessamento dessa mensagem para evitar que fique processando eternamente.

Vamos utilizar a opção headers da mensagem publicada, nela vamos adicionar um contador personalizado, e a cada vez que o consumer da queue ‘q.work’ processar a mensagem irá incrementar esse contador.

Agora temos nosso processo de reenfileiramento com tempo de espera e contador de processamento, pronto para implementação.

Podemos testar publicando uma mensagem na exchange ‘e.work’ e verificar nosso consumidor recebendo a mensagem, reenfileirando, aguardando o ttl expirar e repetir esse processo até que exceda o limite de tentativas definido.

O código completo desse exemplo se encontra no github

Gostaria de avançar um pouco nesse reprocessamento? Redirecionar a mensagem para filas específicas de acordo com o erro no processamento? Leia a parte 2 desse post clicando aqui e conheça outros tipos de Exchanges.

Se você tiver outras táticas para resolver esse problema e puder compartilhar, adoraríamos ouvir de você nos comentários.

Obrigado pela leitura.

--

--

Rafael Galle

Apaixonado por tecnologia e inovação, adoro estudar e aprender novas coisas todos os dias. @gruposervices