Paralelizando tarefas no Kafka

Willyan Guimarães
experienceCode
Published in
6 min readMay 27, 2020

Até agora vimos nos posts anteriores situações onde existiam apenas um consumer responsável por mensagens de um tópico e mais tarde o cenário onde vários consumers estavam configurados para o mesmo tópico, recebendo todas as mensagens, cada um deles. Isso é possível pelo que já havia sido afirmado anteriormente:

Um grupo recebe todas as mensagens.

Por este motivo, toda vez que configuramos um novo consumidor no Kafka e atribuímos a ele um ConsumerGroup, ele receberá todas as mensagens de um tópico.

Agora, vamos supor voltando no exemplo anterior que havíamos dado a respeito do LogService. Eis aí um serviço que exige bastante desempenho de processamento, pensando que criamos este para captar todos os eventos de todos os usuários de uma rede social: postagens, likes, novas conexões, etc. Este seria um caso clássico onde faria todo sentido existir vários “executores LogService” para processar os ínumeros eventos da rede.

Então, vamos lá!

Acompanhe este a passo a passo abaixo com o código desta branch.

  • Duplique a configuração de execução de LogService em sua IDE de preferência e execute cada um:
  • Agora execute diversas vezes a classe “ProducerLikeService” para produzir mensagens;

Será possível perceber que apenas um dos executores de “LogService” estão recebendo e processando as mensagens, o que não é o que esperamos pois o que queremos é paralelizar esta tarefa fazendo assim que os ínumeros eventos sejam processados rapidamente pelos dois executores de LogService.

Afinal, como o Kafka paraleliza e permite que vários executores de um mesmo grupo recebam as mensagens e trabalhem em harmonia ?

Resposta: Através das partições

Indo para a configuração do Kafka percebemos a seguinte informação no arquivo config/server.properties

Desta forma, temos que para cada novo tópico existe apenas uma partição. Uma ilustração de um tópico com apenas uma partição seria mais ou menos assim.

As mensagens chegam e são postadas sequencialmente no tópico, sendo que nesse caso existe uma única partição. Quando o primeiro executor sobe e está pronto para processar ele se responsabiliza por toda a partição:

Logo em seguida, sobe o segundo executor LogService:

Neste caso, onde existe apenas uma partição no tópico não há uma “divisão” entre os dois executores e portanto o segundo fica impossibilitado de processar mensagens. É possível ver isso também no log de execução do mesmo que não terá uma partição a processar:

INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator — [Consumer clientId=consumer-LogService-1, groupId=LogService] Adding newly assigned partitions:

Veja que não é informado a partição que este consumidor irá processar.

Portanto precisamos ajustar o número de partições para que seja possível que cada executor do consumidor venha a ficar responsável pelo processamento de uma delas, ficando mais ou menos assim:

Caminhando para a solução

Uma solução é redefinir no arquivo config/server.properties o número de partições sempre considerando um número igual ou maior que a quantidade de executores que existirão nos tópicos. Isso vale para os próximos tópicos que forem criados a partir desta alteração.

Para executar esta mudança nos tópicos já existentes execute o seguinte comando:

./bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic SOCIAL_NETWORKING_SUBJECT_LIKES_POST --partitions 3

Fazendo também para o outro tópico que temos:

./bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic SOCIAL_NETWORKING_ACTIONS_USERS --partitions 3

Output do console para a execução:

WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!

Para ver de fato se as alterações surtiram efeitos podemos executar o seguinte comando:

./bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe

Veja o output:

Veja o dado “PartitionCount” informando o número de partições já alterado :)

Neste momento reinicie as duas execuções de LogService e verifique no log que agora cada um deles pega partições diferentes para “cuidar”:

[Consumer clientId=consumer-LogService-1, groupId=LogService] Adding newly assigned partitions: SOCIAL_NETWORKING_ACTIONS_USERS-2, SOCIAL_NETWORKING_SUBJECT_LIKES_POST-2[Consumer clientId=consumer-LogService-2, groupId=LogService] Adding newly assigned partitions: SOCIAL_NETWORKING_SUBJECT_LIKES_POST-0, SOCIAL_NETWORKING_ACTIONS_USERS-0, SOCIAL_NETWORKING_ACTIONS_USERS-1, SOCIAL_NETWORKING_SUBJECT_LIKES_POST-1

Pronto, um dos executores capturou as partições 0 e 1 e outro apenas a partição 2.

Agora o próximo passo é executar diversas vezes a classe “ProducerLikeService”.

Observe bem os logs dos dois executores e ainda perceberá mais uma falha.

E aí, já descobriu ?

Logs gerado por um dos executores

CONSUMER LOG — Records found: 1
Processing likes
Key: key:85498
Value: value:likeId=90899, postId=2928928, userOwnerPost=9142, datePost=01/20/2020T10:10:99 userLikePost=29045, datePost=01/20/2020T12:10:99
Partition :0
Offset: 12
CONSUMER LOG — Records found: 1
Processing likes
Key: key:85498
Value: value:likeId=90899, postId=2928928, userOwnerPost=9142, datePost=01/20/2020T10:10:99 userLikePost=29045, datePost=01/20/2020T12:10:99
Partition :0
Offset: 13
CONSUMER LOG — Records found: 1
Processing likes
Key: key:85498
Value: value:likeId=90899, postId=2928928, userOwnerPost=9142, datePost=01/20/2020T10:10:99 userLikePost=29045, datePost=01/20/2020T12:10:99
Partition :0
Offset: 14
CONSUMER LOG — Records found: 1
Processing likes
Key: key:85498
Value: value:likeId=90899, postId=2928928, userOwnerPost=9142, datePost=01/20/2020T10:10:99 userLikePost=29045, datePost=01/20/2020T12:10:99
Partition :0
Offset: 15

Por este log podemos perceber que as mensagens estão sendo gravadas sempre na mesma partição. Um conceito importante a saber é que o Kafka usa a partir da informação de chave qual é a partição será usada para armazenar. Portanto, precisamos corrigir a classe “ProducerLikeService” para produzir chaves únicas. Podemos usar a classe java.util.UUID para gerar nossas keys.

String keyLike = UUID.randomUUID().toString();
String keyActionUser = UUID.randomUUID().toString();

Pronto, o ideal agora é pegar toda a instrução contida em “ProducerLikeService” e englobar um for com algumas repetições. Vai possibilitar uma geração mais ampla de registros e também para analisar de resolvemos o problema da chave.

for (int i = 0; i < 20; i ++) {
//code ProducerLikeService
}

Agora, executando ProducerLikeService será possível ver que as partições estão sendo utilizadas:

CONSUMER LOG — Records found: 1
Processing likes
Key: 32d40c90–9299–473b-87e3-eefcd11e26c4
Value: value:likeId=90899, postId=2928928, userOwnerPost=9142, datePost=01/20/2020T10:10:99 userLikePost=29045, datePost=01/20/2020T12:10:99
Partition :0
Offset: 26
CONSUMER LOG — Records found: 1
Processing likes
Key: a168b63a-ac52–4e79–9c5d-77dc8f410834
Value: value:likeId=90899, postId=2928928, userOwnerPost=9142, datePost=01/20/2020T10:10:99 userLikePost=29045, datePost=01/20/2020T12:10:99
Partition :1
Offset: 8
CONSUMER LOG — Records found: 1
Processing likes
Key: 71af1ad7-e735–4eeb-af32-bc3350babe33
Value: value:postId=2928928, userId=9142, date=01/20/2020 10:10:20
Partition :0
Offset: 21
CONSUMER LOG — Records found: 1
Processing likes
Key: 44868895–5fa0–43f9–9c69-a1d137607205
Value: value:likeId=90899, postId=2928928, userOwnerPost=9142, datePost=01/20/2020T10:10:99 userLikePost=29045, datePost=01/20/2020T12:10:99
Partition :1
Offset: 9

Pronto!

Alguns pontos-chave

  • Para paralelizar tarefas é importante configurar mais de uma partição: Neste caso temos dois tópicos divididos cada um em 3 partições sendo consumidos por dois executores;
  • Não existe disputa entre mensagens, cada um desses executores fica responsável por determinadas partições quando são iniciados; Vale lembrar que existe re-balanceamento mas não se preocupe ainda com isso;
  • Quando a mensagem chega ela é armazenada em algum partição de acordo com sua chave; Portanto, certifique-se de utilizar bons algortimos para gerar as chaves que identificaram o registro;

Na próxima postagem falaremos sobre auto-commit.

Críticas e sugestões são sempre bem-vindas!

--

--