Apache Kafka — Por onde começar

Michael Da Costa Silva
#LocalizaLabs
Published in
24 min readOct 22, 2020

Há pouco mais de um ano, no intuito de trazer para o #LocalizaLabs o Apache Kafka e iniciar uma jornada das buzzwords event-driven, desacoplamento, CDC e tantas outras, vinha estudando esta plataforma que apesar de ter sido lançada em 2011, ainda é novidade para muitas empresas.

Desta forma, orientei meus estudos em primeiro entender a plataforma e descobrir seus componentes, suas capacidades, limitações e possibilidades.

Acabei montando um material para auxiliar nossos times a embarcar nesta jornada. Esta será a primeira parte do estudo, contemplando os motivadores da escolha pela plataforma e o básico para começar a trabalhar. Disponibilizo também um repositório no meu github com fontes de exemplo em .Net Core, nossa principal stack atualmente no #LocalizaLabs. Posteriormente, disponibilizarei a segunda parte dos estudos, contemplando recursos avançados como Kafka Streams, ksqlDB, Connect e etc.

Então, vamos lá. Boa leitura!

O que é Apache Kafka

Apache Kafka é uma plataforma distribuída de streaming, criada pelo time do Linkedin e transformada em open-source, mantida pela Apache Foundation.

Horizontalmente escalável, tolerante a falhas e com grande velocidade, permite a ingestão, tratamento e publicação de enorme volume de dados em tempo real.

O que é uma plataforma de Streaming?

Por plataforma de streaming, entendemos que a mesma deve ser capaz de:

  • Permitir modelo Publisher-Subscriber de comunicação, similar aos sistemas de enfileiramento de mensagem, como RabbitMQ, Tibco EMS, Azure Message Bus, dentre outros.
  • Armazenar de forma durável e tolerante a falhas os dados trafegados
  • Processar os dados recebidos em tempo real

Sendo assim, podemos dizer que uma plataforma de streaming permite que vários sistemas e/ou aplicações consigam conversar entre si de forma assíncrona e em tempo real, podendo ou não realizar transformações diretamente no broker (gerando novas mensagens) antes de entregar as mensagens ao destino final.

Como o Kafka funciona?

Pela natureza de plataforma distribuída, um cluster Kafka é composto por N brokers, sendo que cada um destes atua como líder (Leader) de uma determinada partição, através de eleição pelo Apache ZooKeeper e os demais como seguidores (Followers). Cabe ao líder ficar responsável pela escrita e leitura de dados da sua partição, e aos seguidores espelhá-las através de replicação dos eventos sinalizados pelo líder.

É possível trabalhar com apenas uma instância do Kafka, contudo perde-se as capacidades de redundância, resistência a falhas e escalabilidade.

Um cluster Kafka fornece basicamente 3 APIs:

  • Producer API: Permite as aplicações publicarem (transmitirem) conteúdo
  • Consumer API: Permite as aplicações assinarem/consumir tópicos e resultados de stream processors
  • Stream Processors: permite que aplicações atuem como stream processors consumindo dados de um ou mais tópicos e tranformando-os em outros tópicos
  • Connector API: permite criar e executar bibliotecas para publicação/assinatura integrada a componentes externos, como bancos de dados.
[Fonte: Apache Kafka]

Componentes básicos do Kafka

  • Record: Denominado como Message até a versão 0.8.2, um record é basicamente o dado gravado no Kafka. Apesar de não haver limite de tamanho, usualmente é menor que 1MB.
  • Topic: Agrupamento de records enviados pelos Producers e que juntos fazem algum sentido e são assinados por Consumers.
  • Partition: Com objetivo de aumentar a escalabilidade através de paralelismo tanto de leitura quando escrita, o Kafka automaticamente divide um topic em partitions. Todos os brokers possuem uma réplica de todas as partições, entretanto, há um mecanismo de eleição de líder para definir quem é o responsável por escrita na partição específica.
  • Cluster: Coleção de brokers kafka.
  • Producer: Apps geradoras de records, a serem publicados em topics.
  • Consumer: Apps assinantes de tópicos.
  • Offset: Posição de leitura em um topic.
  • Consumer Group: Isolamento lógico para leitura, muito utilizado em cenários de leitura com paralelismo, onde o consumer group compartilha o mesmo offset.

Veja na imagem abaixo o funcionamento de gravação em um topic, através de balanceamento de partições. O controle na leitura é feito através de um log no broker.

[Fonte: Apache Kafka]

Cada consumer group possui um offset e apps no mesmo consumer group concorrem entre si.

[Fonte: Apache Kafka]

Dentre as principais características do Kafka, posso destacar que os dados são armazenados de forma imutável e persistente, ou seja, registros uma vez recebidos, nunca sofrerão alteração* e são retidos por tempo configurável (TTL), que por padrão é de 7 dias.

*Caso hajam transformações no kafka Streams, são gerados novos records a serem inseridos em outro topic.

Por trabalhar com o padrão publish-subscribe (pub-sub), é bastante utilizado em cenários de arquiteturas orientadas a eventos (event-driven), permitindo a criação de sistemas desacoplados, que notificam eventos sem precisar saber da existência dos seus consumidores.

Arquitetura básica do Kafka

[Fonte: Gartner]
[Fonte: Gartner]

Cenários de uso

Existem vários cenários onde o Apache Kafka pode ser útil. Por ter sido projetado para processamentos em tempo real, recebe, armazena e entrega dados com baixa latência e alta vazão. Por isso, pode ser usado tanto em cenários básicos de troca de mensagens assíncronas, como em canários mais complexos de sincronia de dados entre bases.

[Fonte: Kafka: The Definitive Guide]

Mensageria

O cenário mais básico de uso do Kafka é utilizá-lo como um substituto de um message broker tradicional, como RabbitMQ, TIBCO EMS, entre outros. Dentre as vantagens, podemos citar que o Kafka possui melhor throughput que os brokers tradicionais e já vem com mecanismos de particionamento, replicação e tolerância a falhas.

Brokers de mensageria tradicionais trabalham com Filas (Queues) e Tópicos (Topics). Já no Kafka temos apenas a estrutura de tópicos, onde vários consumidores podem consumir os dados destes. Além disso, não há o conceito de “tirar” uma mensagem da fila. Cada cliente é responsável pelo posicionamento (offset) de leitura, mantendo as mensagens persistentes por tempo tão distante quanto configurado.

Activity Tracking

Monitoramento de atividade do usuário foi o objetivo motivador para a criação do Kafka, quando desenhado pelo time do LinkedIn. Monitorar o comportamento de uso de uma determinada aplicação pode ser útil para entender comportamento, intenções e dificuldades que um usuário pode ter ao utilizar um produto. Além disso, estas informações podem ter grande valor para realização de Testes A/B. O volume de informações pode ser gigantesco, e precisa ser muitas vezes entregues em pipelines de Machine Learning, geração de relatórios, atualização de resultados de pesquisa ou mesmo atividades para enriquecer a experiência do usuário. Os dados podem ser injetados diretamente em tópicos do Kafka, processados através de Stream Processors e entregues nos devidos backends que cuidarão das análises.

Metrics & Logging

Múltiplas aplicações podem gerar um volume enorme de dados de logs, métricas e dados de tracing. Estes dados podem ser consumidos por sistemas de monitoramento e alertas em tempo real, ou mesmo processados de forma offline por sistemas como o Hadoop e serem enviados para indexação em sistemas como o Elasticsearch. Utilizar uma middleware como o Kafka nestes cenários é util pois se for alterada algumas das partes de origem/destino dos dados, não há impacto no restante da solução.

Event Driven Architectures

Aplicações modernas são desenhadas visando baixo acoplamento e possibilidade de escalar horizontalmente. Sendo assim, uma aplicação pode ser composta de pequenos produtos com responsabilidades bem delimitadas, e para isso acabam necessitando realizar muita comunicação entre si. Diferente de arquiteturas tradicionais, onde sistemas chamam uns aos outros, muitas vezes criando vínculos fortes entre si, uma aplicação moderna vale-se mais de comunicar fatos ocorridos no seu domínio, deixando que as diversas aplicações que de alguma forma tenham algum interesse neste domínio, observem os acontecimentos para tomarem ações baseadas neles. Eventos são imutáveis e não requerem respostas de quem os consome.

De forma prática, um evento ocorrido em um determinado sistema pode desencadear ações em outros sistemas interessados, que assinam seus eventos (pub-sub). O desacoplamento é alcançado pois o sistema gerador não tem conhecimento de quem está consumindo seus eventos.

Uma outra vantagem, é que numa arquitetura event-driven, diferente de uma abordagem REST/RPC, não existe necessidade de que todos os serviços estejam online no momento da execução do caso de uso, tornando assim a solução mais robusta como um todo.

Sincronização de dados (microsserviços autônomos, event sourcing, CQRS e ETL)

Quando utilizamos uma MSA (Micro-Services Architecture), lidamos com serviços desacoplados até o nível do banco de dados, ou seja, tanto schema quanto dados estão segregados e são de responsabilidade única do microsserviço.

Apesar do benefício do isolamento, acabamos com um problema nos consumidores, que para buscarem informações que não são do seu domínio precisam realizar requisições REST/RPC, implicando em latência. Uma saída para esta característica é manter uma réplica dos dados, ou cache, e preocupar-se apenas com as atualizações. Observe que neste caso lidamos com consistência eventual, ou seja, os dados mantidos em réplica podem estar desatualizados em relação à fonte proprietária.

Kafka pode ser utilizado para geração de eventos de atualização de dados. Os microsserviços que mantêm réplicas de bases assinam tópicos de eventos de atualização de dados, e assim recebem todas as alterações sem precisar consultar os serviços donos.

Esta abordagem é útil tanto em microsserviços autônomos quanto em abordagens event sourcing e CQRS, onde é preciso observar as atualizações que acontecem nos dados.

Além dos cenários acima, é possível também utilizar o Kafka para cenários de ETL (Extract, Transform, Load), seja para ingestão de dados e entrega em outro tópico ou sink (conector para outros destinos, como um AWS S3, ElasticSearch, ou mesmo um banco de dados), quanto para ingestão e transformação, enriquecendo e/ou compondo os dados antes de serem entregues em outro destino. Para estes casos, pode-se tanto gerar os eventos diretamente no tópico, quanto utilizar conectores para ingestão. Além disso, uma vez no broker, é possível utilizar stream processors (ex. Apacha Spark) para alterar os dados e publicá-los em outro destino.

[Fonte: Gartner]

Trabalhando com Tópicos

A criação de um tópico pode ser tão simples como dar um nome ao mesmo e criá-lo, entretanto várias questões podem impactar diretamente a experiência final em produção. As configurações mais importantes são número de partições e retenção de mensagem. É possível configurar no broker o padrão de criação de todos os tópicos ou definir individualmente.

Particionamento

Particionamento é a forma com que o Kafka consegue escalar. Ao dividir um tópico em N partições, conseguimos realizar leitura em paralelismo. Geralmente o número de partições será igual à quantidade de brokers no cluster.

Por padrão (parâmetro num.partitions), o Kafka cria tópicos com uma partição. É possível aumentar após criado, mas vale notar que uma vez aumentado, o número de partições não pode ser reduzido.

Como escolher o número de partições?

Alguns fatores devem ser considerados ao decidir o número de partições:

Considerar o throughput esperado para o tópico, por exemplo 1GB/seg

Lembre-se da teoria das restrições. No Kafka, a fim de garantir ordem de entrega das mensagens, temos no máximo 1 consumer por partition dentro de um mesmo consumer group, logo, se seu consumer tem um throughput máximo de 100MB/segundo, então o processamento máximo alcançado pelo seu tópico será de 100mb/segundo. É importante fazer essa análise para não superestimar seu particionamento, uma vez que eles aumentam a necessidade de memória e disco no broker Kafka.

Num contexto de mensageria básica, throughput pode não ser algo tão relevante, mas se lembrarmos que o objetivo inicial do Kafka é real time streaming, lidamos com alto throughput.

Vamos fazer um exerício simples: Suponhamos que você tenha um fluxo de entrada esperado de 1GB/seg em um tópico, e sabe que cada consumer tem a capacidade máxima de processar 100MB/seg, então podemos chegar a conclusão de que serão necessários 10 consumers para atingir o throughput esperado. Como dito anteriormente, apenas 1 consumer é utilizado por partição, ou seja, o tópico precisaria ter 10 partições para trabalhar com 10 consumers.

Se você não dispor de informações detalhadas da volumetria esperada, é recomendado que o tamanho da partição em disco não seja maior que 6 gb por dia de retenção.

Além disso, quanto maior o número de consumer groups, maior o impacto em performance no broker dada a necessidade de replicação de mensagens e garantia de ordem para todos os grupos.

Consumer group

Ao definir um consumer group, indicamos ao Kafka que queremos separar o consumo em contexto. Cada consumer group possui um controle de offset a parte, navegando de forma independente no log.

[Fonte: Kafka Documentation]

É importante salientar que caso haja apenas um consumer group com vários consumers, haverá balancemento de carga entre eles, e o broker entregará as mensagens da forma de um barramento de mensageria tradicional (queue based). Caso hajam distintos consumer groups, trabalhamos então em um modelo de subscription (pub-sub based), onde o broker realiza broadcast da mensagem para todos os consumer groups.

Retenção

Existem duas formas de configurar retenção de mensagens no Kafka, por tempo de vida ou por limite de armazenamento.

É possível configurar ambos os triggers de expurgo, sendo este realizado por quem disparar primeiro. Ou seja, se o parâmetro log.retention.hours for definido como 24, e o parâmetro log.retention.bytes for definido como 1000000000 (1 GB), é possível que mensagens sejam expurgadas em menos de 24 horas caso no dia tenham sido recebidos mais de 1GB de mensagens.

Atenção ao fato de que o Kafka não é um mecanismo de enfileiramento de mensagens, mas um mecanismo pub-sub. É importante destacar que ele trabalha com um conceito de log, ou seja, todas as mensagens recebidas ficam disponíveis para consumo por um tempo determinado (por padrão 7 dias). Caso um consumer fique fora do ar por período maior que o TTL (time to live) da mensagem, ele perderá a mesma. O mesmo vale para a política de expurgo por limite de armazenamento.

Garantia de ordenação

O Kafka somente garantirá ordenação se você tiver um producer enviando mensagens para uma partição e um consumer lendo mensagens de uma partição. As mensagens são preservadas em ordem no log da partição conforme elas chegam, então não é possível garantir ordenação se múltiplos producers enviam simultaneamente numa mesma partição.

Melhores Práticas

  • Nome do tópico: ao nomear um tópico, não é recomendado iniciar o seu nome com __ (dois underscores), pois tópicos internos do cluster utilizam este padrão. Além disso, não é recomendado utilizar juntos “.” e “_” nos nomes, pois para gerar métricas o Kafka transforma . para _.
  • Número de Partições: aumentamos o paralelismo adicionando mais consumers, então pode ser necessário aumentar o número de partições em um tópico existente. Contudo, caso na criação do tópico tenha sido utilizada alguma estratégia de particionamento por chave (keyed messages), não é recomendado aumentar o número de partições posteriormente, pois causará resizing no topic. Importante: Caso seja necessário reduzir a quantidade de partições em um tópico, é necessário excluir o tópico e recriá-lo.
  • Chaves: quando produzimos uma mensagem, caso não seja informada a Key do registro e exista mais de uma partição, o Kafka utilizará um algoritmo round-robin para balancear entre as partições existentes. Se for informada a Key na produção da mensagem, esta cairá sempre na mesma partição, a ser determinada pela hash da key. Pode existir cenários onde a frequência de uma key seja maior que as demais, isso resultará em uma partição muito grande, já que além de receber todas mensagens dessa key, receberá também mensagens de outras keys através do balanceamento. Para evitar esse cenário, é possível construir um custom partitioner com alguma regra de segregação específica.
  • Garantia de entrega ordenada: existem casos de uso onde a ordem de recebimento das mensagens é crítica. Imagine um cenário onde o cliente deposita R$ 100,00 e em seguida saca R$ 100,00. Se a ordem de recebimento das mensagens não for mantida pode-se gerar um grande problema. Nestes cenários, pode ser interessante configurar o parâmetro max.in.flights.requests.per.session = 1. Por padrão ele vem configurado com o valor 5, significando que o broker receberá até 5 mensagens em lote para processar, e neste caso havendo problema ao realizar commit da primeira mensagem no broker e sucesso na segunda, ao tentar novamente a primeira, as mensagens invertem a ordem. Note que realizar essa configuração para aceitar requests de apenas uma mensagem por vez significa diminuir o throughput do producer, logo deve ser usada apenas nestes cenários.

Producers Avançado

A API para produção de registros é bem simples, entretanto é interessante conhecê-la mais profundamente de forma a tirar o melhor do produto.

Veja no desenho mais abaixo o funcionamento geral da Producer API.

Fonte: Kafka: The Definitive Guide

O processo inicia-se no cliente, criando um ProducerRecord, que obrigatoriamente deve ter configurado qual é o tópico e deverá também ter um valor. Pode-se opcionalmente especificar qual a chave e/ou partição da mensagem.

Os dados são então serializados em ByteArrays para serem trafegados através da rede e enviados ao partitioner. Caso tenhamos especificado a partição, o partitioner apenas a devolve, caso contrário o partitioner definirá a partição destino, geralmente baseada na chave do Registro.

Definida a partição, o registro irá para uma fila (buffer) junto a outros registros que estão prontos para serem gravados no mesmo tópico. Uma thread irá cuidar de enviar os registros dessa fila para os brokers Kafka.

Quando o broker recebe o registro, ele envia ao remetente uma resposta (RecordMetadata) com as informações do tópico, partição e offset do registro em caso de sucesso. Em caso de erro, o producer receberá o erro e poderá tentar o reenvio por algumas vezes antes de desistir e entregar o erro ao cliente.

Formas de envio

Apesar de a biblioteca da Confluent para .Net permitir apenas envio assíncrono, existem 3 formas de se enviar mensagens no Kafka:

  • Assíncrono: dispara-se uma mensagem para o Kafka e aguarda o callback.
  • Síncrono: dispara-se uma mensagem para o Kafka e interrompe o processamento da thread até que o retorno seja recebido.
  • Fire-and-forget: a mensagem é enviada ao broker e não nos importamos se chegará com sucesso ou não. Pela alta disponibilidade do Kafka e mecanismos de retry, é bem possível que a mensagem chegará com sucesso, entretanto, é possível sim ocorrer perda de mensagens. Mais abaixo veremos os tipos de erros mais comuns. Uma chamada fire and forget é realizada por um producer configurado para não receber relatório de entrega (ProducerConfig.Acks = Acks.None).

Tipos de erros

O Kafka tem um mecanismo automático de retry, que funciona basicamente nos dois casos listados mais abaixo. Erros como “message size too large” nunca terão re-tentativas.

Os dois tipos de erros passíveis de re-tentativa no Kafka são os seguintes:

  • Retriable errors: erros onde pode-se resolver tentando enviar novamente a mensagem, como perda de conexão com o broker.
  • No leader error: erros onde há perda de consistência em uma partição e o cluster precisa eleger um novo líder. O Kafka por padrão (parâmetro MessageSendMaxRetries) tenta novamente outras vezes até que um novo líder seja eleito. Caso o número de tentativas seja excedido, aí sim será gerada uma exceção no producer.

Parâmetros do Producer

Na documentação do Apache Kafka é possível consultar a lista completa de parâmetros do produto, entretanto, existem contextos onde alguns ajustes podem influenciar aspectos como latência, consumo de memória, dentre outros. Veja abaixo alguns que podem ser úteis.

acks

Um registro é considerado consistente quando gravado em todas as réplicas de partições. O parâmetro Acks serve para configurar quantas confirmações serão enviadas ao producer, podendo ser uma das seguintes:

  • Acks = 0. O producer não aguardará confirmação do broker, aumentando bastante sua capacidade de throughput, todavia, caso o broker não tenha recebido sua mensagem a mesma será perdida.
  • Acks = 1. O producer receberá a confirmação de sucesso no momento que a réplica líder receber a mensagem. Caso a réplica líder falhe, um erro será recebido pelo Producer, que tentará enviar novamente. Neste cenário, caso o líder trave e seja necessário escolher um novo líder, pode haver perda de mensagem caso o líder eleito não a tenha (o que é chamado de unclean leader election). Ainda para esta configuração, o throughput pode ser afetado caso esteja usando chamadas síncronas ou se o parâmetro de máximo de mensagens in-flight (MaxInFlight) seja igual a 1. Lembrando que este parâmetro ajuda a manter a ordem das mensagens em casos de retry.
  • Acks = all. O retorno de sucesso será recebido apenas quando todas as réplicas receberem a mensagem. Esta é a confuguração de menor throughput, entretanto é a que garante a maior segurança.

buffer.memory

Caso você tenha algum cenário onde o throughput gerado pela aplicação é maior que a capacidade de envio para o broker, pode acabar a memória disponível para uso no broker. Como visto antes, um producer enche um buffer de records a serem enviados ao broker. Esgotando este espaço e estando bloqueada/travada a thread de envio, pode haver exceção. O producer bloqueará a produção de mensagens por tempo determinado no parâmetro max.block.ms.

compression.type

Por padrão o Kafka vem com a compactação desabilitada. Comprimir mensagens resulta em menor uso de rede e armazenamento em disco, geralmente gargalos.

O Kafka oferece 4 mecanismos de compressão, que são:

  • gzip: algoritmo que entrega ótima capacidade de compressão, contudo utiliza mais CPU e tempo.
  • snappy: algoritmo criado pelo Google, que garante boa taxa de compressão e baixo impacto em CPU. É recomendado em cenários onde largura de banda e desempenho são críticos.
  • lz4: taxa de compactação próxima a do snappy, entretanto possui maior velocidade para descompactação.
  • zstd: algoritmo de compressão criado pelo Facebook, similar ao snappy. Alguns testes mostram considerável vantagem neste.

A recomendação de uso pode variar conforme combinação de hardware, codecs e casos de uso. Cada empresa deve realizar seu próprio benchmark.

Vale observar que quanto maior o buffer para batching, maior será o aproveitamento da compressão.

retries

Como dito anteriormente, o producer já lida com novas tentativas de envio em caso de erros transientes, como eleição de novo líder. Sendo assim, não é preciso tratar lógicas de retry na aplicação. O esforço deve ser empregado em tratar erros não transientes.

O ponto a se analisar nesta configuração é encontrar qual o melhor parâmetro de tempo de espera entre as tentativas, por padrão o parâmetro retry.backoff.ms vem configurado em 100 ms. A recomendação é analisar quanto tempo seu cluster leva para eleger um novo líder, voltar de uma situação de crash, dentre outros, e definir a quantidade de retentativas e o intervalo entre elas, para evitar estourar os limites e receber uma exceção do producer.

batch.size

O producer acumula mensagens antes do envio. Se definido como 0 bytes, nenhum batch será criado, e as mensagens são enviadas de imediato, ocasionando em queda de throughput. Se definido o valor máximo, muita memória será alocada antecipadamente para acumular o buffer.

linger.ms

Este parâmetro indica quanto tempo o producer aguardará para receber uma nova mensagem a ser colocada no buffer antes de enviar. Pode ser útil caso necessário diminuir a quantidade de requests ao broker. Aumentar a quantidade de mensagens enviadas por vez significa aumentar o throughput, no entanto, aumenta-se também a latência. Por padrão vem definido como 0, ou seja, assim que o producer tenha uma thread de envio disponível, ele envia a mesma ao broker.

max.in.flight.requests.per.connection

Limita a quantidade de requisições de envio sem confirmação por conexão, ou seja, se uma mensagem não receber ack, o producer será bloqueado até que a retentativa obtenha sucesso. Isso garante ordenação das mensagens (FIFO), entretanto diminui o throughput. Por padrão vem configurado como 5.

client.id

Informação livre para identificar no servidor a origem das mensagens, além de apenas o IP do mesmo. Importante para cenários de tracing.

idempotence

Habilitando este parâmetro temos a garantia de que exatamente uma cópia da mensagem foi escrita no tópico, evitando a duplicação em casos de retry. Garante-se também ordenação. Quando habilitado, automaticamente trabalhamos com os parâmetros max.in.flight.requests.per.connection = 5, retries=INT32_MAX e acks=all.

Consumer Avançado

O Apache Kafka é um sistema que compartilha uma característica comum a muitos sistemas de mensageria que é ser um sistema puxado e não empurrado, ou seja, quem dita o ritmo da vazão é o Consumer. Consumer é um client que lê mensagens em um tópico. Consumer Group é uma forma de escalar o throughput do Kafka, paralelizando consumidores de forma agrupada, na leitura de um tópico.

Aumentar o throughput significa aumentar a quantidade de partições e de consumers. Se o número de partições for igual ao número de consumers no grupo, cada um ficará responsável por tirar mensagens de uma partição.

[Fonte: Kafka: The Definitive Guide]

Se o número de partições for menor que o número de consumers no grupo, o broker escalonará o envio de mensagens de forma igual para cada consumer.

[Fonte: Kafka: The Definitive Guide]

Já se um número de partições for menor que o número de consumers no grupo, os consumers excedentes ficarão ociosos.

[Fonte: Kafka: The Definitive Guide]

O broker possui um coordenador que sempre rebalanceará o mapeamento quando houver entrada ou saída de consumers de um determinado consumer group.

Caso um consumer fique ocioso por muito tempo, ele é marcado como dead e o rebalanceamento ocorre.

Semânticas de entrega de mensagem

Um dos problemas mais conhecidos e de difícil solução no mundo de sistemas distribuídos está relacionado à semântica de entrega de mensagens. Pela natureza distribuída, podem ocorrer falhas em vários pontos da solução, e começamos a enfrentar cenários como perda de mensagem, processamentos em duplicidade ou em ordem errada. Imagine um cenário onde é iniciada uma transação de saque em uma conta corrente. Se a mensagem não chegar, corre-se o risco de entregar o dinheiro sem debitar o valor do saque; se a mensagem chegar duas vezes, corre-se o risco de debitar em dobro; se a mensagem chegar em ordem inversa, pode não haver saldo na solicitação de saque, impedindo-o.

As semânticas são as seguintes:

At least once (Pelo menos uma vez)

No cenário perfeito o producer envia uma mensagem ao broker, que recebe com sucesso (acks=all). A mensagem foi entregue apenas uma vez. Na prática, pode ocorrer um erro no broker ou o producer tomar timeout. O producer reenviará a mensagem, tudo certo caso ela não tenha sido gravado no log, todavia se o broker tiver previamente gravado mas teve problema no envio do ack, teremos então uma mensagem duplicada.

Do ponto de vista do consumer, ele retira a mensagem do tópico, processa a mensagem e só então realiza o commit no offset. Caso o consumer trave durante o processamento da mensagem, ao reiniciar ele retirará novamente a mesma mensagem do tópico, e somente ao concluir com sucesso realizará o commit no offset. Não há perda de mensagem, mas pode haver duplicação.

Configurações do cliente para garantir esta semântica: acks=all, max.in.flight.requests.per.connection=1, retries > 0.

At most once (No máximo uma vez)

Ainda no cenário de exemplo do saque, se o Producer tiver enviado uma mensagem, e não recebeu o ack, para evitar duplicação não haverá retentativa de envio. Contudo, pode-se haver perda de mensagem.

Do ponto de vista do consumer, ele retira a mensagem do tópico, realiza commit do offset e então processa a mensagem. Caso o consumer trave entre o commit do offset e o processamento da mensagem, ao reiniciar começará do próximo registro sem ter obtido sucesso na mensagem anterior, ou seja, houve perda de mensagem.

Configurações do cliente para garantir esta semântica: acks=1, max.in.flight.requests.per.connection=5.

Exactly once (Exatamente uma vez)

Ainda no cenário hipotético acima, mesmo havendo reenvio a mensagem será recebida apenas uma vez. Este é o cenário mais desejado, e também o mais caro, pois envolve uma cooperação entre o broker e a aplicação cliente. Se o cliente não reconhecer uma mensagem como duplicada e processá-la, haverá duplicação. Lembre-se que pela natureza do Kafka é possível voltar o offset do tópico realizando replay das mensagens. Se não houver uma mecânica para garantir Exactly once no cliente, perde-se a garantia de unicidade.

Do ponto de vista do consumer, deve haver um processo transacional tanto no consumer quanto na aplicação que processa a mensagem, para em caso de erro não realizar o commit no offset.

A partir da versão 0.11 do Kafka foi disponibilizado o conceito de idempotência, ou seja, a garantia de que uma mensagem seja entregue uma e apenas uma vez em uma partição específica de um tópico durante o tempo de vida de um producer. Além disso foi entregue também controle transacional (até o fechamento deste artigo, não está disponível para o client .net), permitindo atomicidade na entrega de um grupo de mensagens a mais de uma partição, ou seja, todas as mensagens são entregues ou nenhuma delas será.

Juntos, estes dois recursos permitem alcançarmos a semântica Exactly Once. De acordo com o time da Confluent, pouco overhead é adicionado ao se habilitar estes recursos (3% de degradação comparado a configuração que garanta “at least once” e 20% comparado a configuração que garanta “at most once”).

Funciona da seguinte forma: o producer recebe um Id e envia junto às mensagens um sequencial que permite ao broker deduplicar as mensagens recebidas.

Configurações do cliente para garantir esta semântica: definir o parâmetro enable.idempotence = true na configuração de cliente do Producer.

Esta configuração basicamente limita os requests in-flight para o máximo de 5, retries > 0 e acks=all. Se algum destes parâmetros for configurado fora dessa regra, ocorrerá exceção no producer.

Vale lembrar o ponto citado anteriormente, se a aplicação consumidora não tiver os devidos cuidados para não processar mais de uma vez, sozinho o broker não entregará Exactly Once.

Formas de consumo de tópicos

  • Subscribe: quanto utilizamos subscribe, o broker dinamicamente decidirá o balanceamento através do consumer group, indicando qual consumer irá ler qual ou quais partições do tópico.
  • Assignment: caso haja algum cenário onde o consumer precise receber especificamente mensagens apenas de uma ou várias partições, é possível assinar o tópico especificando-as. Neste caso não há consumer group e a estratégia fica toda por conta do consumer.
  • Seek: existem casos de uso onde precisa-se movimentar pelo log. Nestes casos é possível utilizar o método Seek() para posicionar o offset no local desejado. Um exemplo seria no caso de um sistema que se importe apenas com as informações mais recentes, então busca apenas os registros mais novos. Outro exemplo seria na tratativa de um bug, com necessidade de replay do log (event sourcing), posicionando o offset no local desejado e refazendo os passos perdidos.

Caso você tenha um cenário onde você mesmo controla o offset, é preciso definir o parâmetro enable.auto.commit=false.

Boas práticas

  • Unsubscribe e Close: trabalhando com consumer groups, caso um consumer finalize seu trabalho e for deixar o grupo, otimizamos o uso do broker ao sinalizar a saída através dos métodos Unsubscribe e Close. O broker gasta recursos ao manter estado de cada um dos consumers. Ao utilizar o método Close(), acontece o commit do offset do consumer, a saída deste do consumer group, rebalanceamento das partições entre os consumers restantes e a liberação de recursos do broker. Além disso, o consumer se desconecta do broker. O método unsubscribe também força o rebalanceamento no coordenador do grupo, entretanto mantém a conexão ativa com o broker.

Como utilizar

Este não é um manual de como instalar o Kafka, para tal, seguir o quickstart na documentação da Apache. O objetivo é demonstrar como realizar pub-sub nos tópicos existentes.

Para montar um broker Kafka de exemplo, sugiro usar a distribuição da Confluent, e seguir este tutorial para criar uma instancia no docker.

Para ambos os cenários a serem exemplificados, criar a estrutura básica do .net core e instalar o pacote “Confluent.Kafka” no NuGet.

mkdir kafka
cd kafka
dotnet new console
dotnet add package Confluent.Kafka --version 1.3.0

Producer

Abaixo podemos ver um Publisher escrito em .Net Core. O objetivo é demonstrar como realizar gravação em um tópico em específico.

Note que são necessárias basicamente 2 informações para que o snippet funcione, que são elas:

BootstrapServers: IP e Porta do broker Kafka Topic: Nome do tópico a ser utilizado.

using System;
using System.Threading.Tasks;
using Confluent.Kafka;
class Program
{
public static async Task Main(string[] args)
{
var config = new ProducerConfig { BootstrapServers = "{{URL BROKER}}:9092"};
string Topic = "meu_topico";
using (var p = new ProducerBuilder<Null, string>(config).Build())
{
try
{
var dr = await p.ProduceAsync(Topic, new Message<Null, string> { Value="teste" });
Console.WriteLine($"Entregou '{dr.Value}' em '{dr.TopicPartitionOffset}'");
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Falha na entrega: {e.Error.Reason}");
}
}
}
}

Consumer

Abaixo podemos ver um Client escrito em .Net Core. O objetivo é demonstrar como realizar leitura de um tópico em específico.

Note que são necessárias basicamente 3 informações para que o snippet funcione, que são elas:

GroupId: Nome do Consumer Group BootstrapServers: IP e Porta do broker Kafka Topic: Nome do tópico a ser utilizado.

using System;
using System.Threading;
using Confluent.Kafka;

class Program
{
public static void Main(string[] args)
{
var conf = new ConsumerConfig
{
GroupId = "test-consumer-group",
BootstrapServers = "{{URL BROKER}}:9092",
AutoOffsetReset = AutoOffsetReset.Earliest
};

string Topic = "meu_topico";

using (var c = new ConsumerBuilder<Ignore, string>(conf).Build())
{
c.Subscribe(Topic);

CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true;
cts.Cancel();
};

try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumiu a mensagem '{cr.Value}' em: '{cr.TopicPartitionOffset}'.");
}
catch (ConsumeException e)
{
Console.WriteLine($"Erro ocorrido: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
c.Close();
}
}
}
}

Mais informações: Client .Net da Confluent

Um Kafka para chamar de meu

Para finalizar, compartilho um resumo da matriz de decisão que realizei e que nos levou a termos escolhido a plataforma Confluent Cloud para o #LocalizaLabs.

O estudo começou com a análise tanto do Apache Kafka em uma instalação On-Premises quanto produtos gerenciados por um provedor Cloud como o Confluent Cloud, Amazon MSK e também produtos similares, que compartilham os conceitos do Kafka.

Confluent Cloud foi o vencedor, por ser um Kafka com poderes extras (Confluent Console, Schema Registry e Kafka Connect com base mais ampla de conectores), totalmente gerenciado, auto-escalável e por permitir multi-cloud/hybrid cloud.

Demais alternativas:

  • Amazon Kinesis: funciona analogamente ao Kafka, pois suporta processamento de fluxo de eventos, mas integra-se apenas aos produtos da AWS. Além disso, não é possível consumir os streams diretamente de consumers do Kafka.
  • Azure Event Hubs: similar ao Kinesis, oferecendo boa integração com demais produtos da Azure. Tem como ponto favorável exportar a API do Kafka, permitindo que Kafka Consumers funcionem normalmente.
  • Amazon MSK: Kafka Vanilla, provisionado pelo console da AWS e possui cluster gerenciado pela AWS. Entretanto todo o resto é de responsabilidade do usuário. Não é auto-escalável e atualizações são de responsabilidade do cliente.
  • Azure HDInsight: plataforma que oferece embutido Hadoop, Spark e Kafka. Pouco estudo foi realizado por ter um escopo maior.
  • Confluent Kafka On-Premises: plataforma Kafka + recursos da Confluent (Connect, KSQL, Streams, etc). Open-source com subscription de suporte cobrada por quantidade de nodes. Um sizing com 3 brokers é suficiente para processar de 4 a 5 trilhões de registros por dia.

Bom, espero que tenha gostado da leitura. Como dito anteriormente, em breve compartilharei a segunda parte do estudo e também os slides do treinamento que oferecemos aos nossos times.

Até mais!

Referências

Usos do Kafka
Kafka Streams Documentation
Event Driven
Best Practices for Event-Driven Microservice Architecture
CQRS
Event Sourcing
Apache Kafka Documentation
Kafka: The Definitive Guide
Confluent Blog — Semantics
DZone — Kafka Semantics
Client .Net da Confluent
Gartner — Enabling Streaming Architectures for Continuous Data and Events With Kafka (sem link, material restrito)

--

--

Michael Da Costa Silva
#LocalizaLabs

Solutions Architect | Software Engineer | DevOps Practitioner | ML Enthusiast