Entendendo como o Consumidor Kafka funciona
Nesse artigo você vai entender de forma técnica como o Consumidor Kafka trabalha
Apache Kafka é uma plataforma de streaming baseada em sistema distribuído de publish/subscribe, onde há um processo chamado producer, que envia mensagens para um tópico, que é gerenciado e armazenado por um cluster de brokers, e um processo chamado consumer que subscreve ao tópico para realizar a leitura e processamento de tais mensagens.
Um tópico é distribuído entre os brokers e cada broker gerencia um subconjunto de mensagem de cada tópico — tais subconjuntos são chamados de partições. O número de partições é definido quando um tópico é criado e pode ser aumentado a qualquer momento após a criação, mas nunca diminuído.
É importante entendermos que partição é a unidade de paralelismo do Kafka producer e consumer.
Do lado do producer as partições de um tópico permitem escrever mensagens em paralelo. Se uma mensagem é publicada com uma chave, por padrão o producer fará um cálculo para definir qual partição irá determinada chave e isso irá garantir que sempre as mensagens com aquela mesma chave vá para a mesma partição, mantendo a ordenação. Adicionalmente o consumer terá a garantia de ler estas mensagens em ordem por partição.
No Apache Kafka, diferente dos sistemas tradicionais de mensageria (Message Queue), cada evento publicado pelo produtor é persistido por um período de tempo definido pelo parâmetro retention.ms (7 dias por padrão), depois disso o evento é excluído do mais antigo pro mais novo. Isso significa que quando consumidor ler tais eventos do Kafka não irá ‘retirá-lo’ do tópico, mas sim commita-los — vamos entender mais a frente como funciona esse processo.
Aplicações que precisam consumir dados do Kafka usam o KafkaConsumer para subscrever ao tópico e ler os eventos que nele forem produzidos.
Antes de entender como o KafkaConsumer ler eventos do cluster Kafka é preciso entendermos um conceito básico.
Consumer Groups
Suponha que você tem uma aplicação que ler eventos de um tópico do Kafka, roda algumas validações e escreve os resultados em uma base de dados. Isso funciona bem aparentemente. Mas, se o produtor do evento escalar em um nível que uma única aplicação (POD, container ou instância) consumidora não conseguir dar vazão? Obviamente esta aplicação vai ficar atrasada no consumo (com LAG) podendo até falhar em caso de gargalo.
Nesse exemplo acima fica claro que a sua aplicação consumidora deve escalar horizontalmente para, assim como o produtor, atingir um throughput maior.
Quando escalamos a aplicação consumidora, compartilhando o mesmo group-id, todas se tornarão parte de um consumer group. Com isso, cada consumer nesse grupo irá receber mensagens de uma ou mais partições do tópico subscrito.
Vamos ver um exemplo de um tópico com 4 partições:
- suponha que criamos um consumer, C1;
- este pertencente ao grupo consumidor G1 e está subscrito ao tópico T1;
- o único consumer C1 irá subscrever e consumir todas as mensagens das 4 partições do tópico T1.
Figura abaixo:
Se adicionarmos outro consumer, C2, no grupo G1, cada consumer irá subscrever e consumir mensagens de 2 partições. Podemos ver na imagem abaixo o seguinte cenário:
- o consumer C1 está subscrito na partição 0 e 2; e
- o consumer C2 está subscrito na partição 1 e 3.
Esse grupo consumidor G1 pode escalar até no máximo 4 consumidores e cada um irá subscrever em uma partição, conforme imagem:
Se, por algum motivo, a aplicação continuar escalando e adicionar mais um consumidor, para esse tópico que tem apenas 4 partições, este ficará ocioso:
Logo, se você quer atingir um maior throughput de consumo na sua aplicação, o primeiro passo talvez seja escalar horizontalmente sua aplicação no mesmo grupo de consumo até o número máximo de partições.
Em um único tópico podemos ter vários grupos de consumidores, e estes vão consumir em tempos distintos, cada grupo de consumo realiza o processamento isolado e após isso o consumer realiza o commit do offset, que representa o id único de cada mensagem dentro do Kafka.
Consumer Group e Rebalance de Partições
Bem, já sabemos que um consumer dentro de um grupo compartilha as partições de um tópico. Quando adicionado um novo consumer a esse grupo é realizado a redistribuição das partições entre os consumers ativo, da mesma forma acontece quando um consumer é removido.
Essa redistribuição de partições de um tópico entre consumers do mesmo grupo é chamado de rebalance. Rebalances são importantes para o consumer group porque ele provém alta disponibilidade e escalabilidade. No entanto, ele acaba sendo uma faca de dois gumes, pois pode também gerar uma latência indesejável no curso normal do fluxo de eventos quando realizado constantemente.
Essencialmente há dois tipos de rebalances, dependendo da strategy de partition assignment (estratégia de assinatura de partições) que o consumer group usa.
- Eager Rebalances: Durante um eager rebalance todos os consumidores param de consumir, revogando a partição que tinha subscrito anteriormente. Depois realiza o rejoin, ou seja, junta-se novamente ao grupo e solicita uma nova partição para realizar a subscrição. Todo esse processo gera uma janela de indisponibilidade. O tamanho desta janela depende da quantidade de consumidores dentro do grupo, bem como a quantidade de partições do tópico.
- Cooperative Rebalances: Já no cooperative rebalance (também chamado de incremental rebalances) é revogado apenas um conjunto de partições de um, ou mais, consumer realizando a redistribuição destas partições para outros consumers permitindo que os demais consumers, não revogados, continuem processando. Portanto isso evita que todos os consumers parem de consumir gerando uma janela de indisponibilidade generalizada. Esse modelo é extremamente valido para um grupo de consumo com um numero elevado de consumers e partições.
O caminho para os consumers manter os membros de um grupo ativos, consumindo e processando mensagens é enviando heatbeats constantemente para o Kafka broker designado como coordinator do grupo. Por padrão o heartbeat é enviado a cada 3 segundos, podendo ser ajustado pelo parâmetro heartbeat.interval.ms. Esse processo é executado por uma thread separada que roda em backgroud e que tem 10 segundos de timeout (session.timeout.ms). Se dentro desse intervalo não enviar nenhum heartbeat, o consumer é considerado com falha e é removido do grupo, iniciado um novo rebalance.
Porém, esse não é o único motivo de causa de rebalance, mas vamos falar mais profundamente a respeito do rebalance em um outro artigo.
Poll Loop
Podemos dizer que o coração do consumer Kafka é o loop de polling que o Kafka client realiza para o cluster Kafka em busca de dados.
Mas o que é esse polling? O método poll(), do cliente consumer Kafka, retorna uma lista de eventos e cada evento contém o metadado (o tópico, partição, timestamp, etc) e a informação em si (chave e valor da mensagem).
No entanto, o poll() não se limita em apenas obter dados do Kafka. A primeira vez que o Kafka client Consumer chama o método poll() este realiza a busca do GroupCoordinator, do group-id especifico que foi declarado na aplicação, realiza join no grupo de consumidor e por último recebe a partição que será realizado a subscrição. Se um rebalance é iniciado, este é realizado dentro do loop de polling também.
Uma vez instanciado esse método, a API do Kafka consumer realiza de tempos em tempos polling para o cluster Kafka. O intervalo máximo é definido pelo parâmetro max.poll.interval.ms, que por padrão são 5 minutos. Logo, se durante esse intervalo o Backend desse consumer não conseguir processar as mensagens e realizar uma nova chamada, este será removido do grupo de consumidores, causando rebalance.
Portanto, é importante evitar fazer processamentos longos de mensagens dentro do método poll().
Abaixo um exemplo de código usando Java nativo:
- Essa linha é basicamente para a aplicação ficar em loop infinito;
- Instanciando o método poll();
- O poll() retorna uma lista de records, como falado anteriormente. Cada record contém o tópico e partição que este veio, o offset dentro da partição e, é claro, o dado em si, chave e valor;
- Depois de processado cada evento, é escrito um log informando o resultado. Em um cenário real, essa informação poderia ser salva em uma base de dados, por exemplo;
- Importante sempre chamar o método close() para encerrar o consumer.
Commit e Offsets
Sempre que chamado o método poll() é retornado dados que foram gravados no Kafka e que os consumidores no grupo não leu ainda. Isso significa que conseguimos rastrear de cada record que foi consumido pelo consumer. Como foi falado no inicio, no Kafka não é removido o dado depois de consumido. Para isso é usado um identificador único (offsets) para cada record gravado em cada partição e quando o consumer processa a informação ele tem que atualizar a posição do consumo.
Chamamos a ação de atualizar a posição de cada grupo de consumidor em uma partição de commit.
O registro de commit de cada grupo de consumo fica armazenado em um tópico interno do Kafka (__consumer_offsets) onde é alimentado pelo o coordinator, que recebe a informação dos consumidores.
Conforme descrito anteriormente, quando tem um cenário onde o produtor do evento está bem mais rápido que o consumidor, poderá haver diferença de mensagens pronta para serem consumidas e as já consumidas (‘commitadas’), essa diferença é chamada de LAG. A imagem abaixo representa bem esse atraso:
Se você quer saber como monitorar o LAG dos consumers groups e tópicos, nesse artigo aqui mostro como instrumentar essa monitoração utilizando ferramentas Open Source.
Porém, há diversos outros motivos da causa de LAG, como problemas de Backend lento do consumer ou até mesmo rebalance constantes de grupo consumidores. Em um outro artigo vamos entender as principais causas de rebalance, como reduzi-los e tunar o consumer.