Paralelizando tarefas no Kafka
Até agora vimos nos posts anteriores situações onde existiam apenas um consumer responsável por mensagens de um tópico e mais tarde o cenário onde vários consumers estavam configurados para o mesmo tópico, recebendo todas as mensagens, cada um deles. Isso é possível pelo que já havia sido afirmado anteriormente:
Um grupo recebe todas as mensagens.
Por este motivo, toda vez que configuramos um novo consumidor no Kafka e atribuímos a ele um ConsumerGroup, ele receberá todas as mensagens de um tópico.
Agora, vamos supor voltando no exemplo anterior que havíamos dado a respeito do LogService. Eis aí um serviço que exige bastante desempenho de processamento, pensando que criamos este para captar todos os eventos de todos os usuários de uma rede social: postagens, likes, novas conexões, etc. Este seria um caso clássico onde faria todo sentido existir vários “executores LogService” para processar os ínumeros eventos da rede.
Então, vamos lá!
Acompanhe este a passo a passo abaixo com o código desta branch.
- Duplique a configuração de execução de LogService em sua IDE de preferência e execute cada um:
- Agora execute diversas vezes a classe “ProducerLikeService” para produzir mensagens;
Será possível perceber que apenas um dos executores de “LogService” estão recebendo e processando as mensagens, o que não é o que esperamos pois o que queremos é paralelizar esta tarefa fazendo assim que os ínumeros eventos sejam processados rapidamente pelos dois executores de LogService.
Afinal, como o Kafka paraleliza e permite que vários executores de um mesmo grupo recebam as mensagens e trabalhem em harmonia ?
Resposta: Através das partições
Indo para a configuração do Kafka percebemos a seguinte informação no arquivo config/server.properties
Desta forma, temos que para cada novo tópico existe apenas uma partição. Uma ilustração de um tópico com apenas uma partição seria mais ou menos assim.
As mensagens chegam e são postadas sequencialmente no tópico, sendo que nesse caso existe uma única partição. Quando o primeiro executor sobe e está pronto para processar ele se responsabiliza por toda a partição:
Logo em seguida, sobe o segundo executor LogService:
Neste caso, onde existe apenas uma partição no tópico não há uma “divisão” entre os dois executores e portanto o segundo fica impossibilitado de processar mensagens. É possível ver isso também no log de execução do mesmo que não terá uma partição a processar:
INFO org.apache.kafka.clients.consumer.internals.ConsumerCoordinator — [Consumer clientId=consumer-LogService-1, groupId=LogService] Adding newly assigned partitions:
Veja que não é informado a partição que este consumidor irá processar.
Portanto precisamos ajustar o número de partições para que seja possível que cada executor do consumidor venha a ficar responsável pelo processamento de uma delas, ficando mais ou menos assim:
Caminhando para a solução
Uma solução é redefinir no arquivo config/server.properties o número de partições sempre considerando um número igual ou maior que a quantidade de executores que existirão nos tópicos. Isso vale para os próximos tópicos que forem criados a partir desta alteração.
Para executar esta mudança nos tópicos já existentes execute o seguinte comando:
./bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic SOCIAL_NETWORKING_SUBJECT_LIKES_POST --partitions 3
Fazendo também para o outro tópico que temos:
./bin/kafka-topics.sh --alter --zookeeper localhost:2181 --topic SOCIAL_NETWORKING_ACTIONS_USERS --partitions 3
Output do console para a execução:
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
Adding partitions succeeded!
Para ver de fato se as alterações surtiram efeitos podemos executar o seguinte comando:
./bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe
Veja o output:
Veja o dado “PartitionCount” informando o número de partições já alterado :)
Neste momento reinicie as duas execuções de LogService e verifique no log que agora cada um deles pega partições diferentes para “cuidar”:
[Consumer clientId=consumer-LogService-1, groupId=LogService] Adding newly assigned partitions: SOCIAL_NETWORKING_ACTIONS_USERS-2, SOCIAL_NETWORKING_SUBJECT_LIKES_POST-2[Consumer clientId=consumer-LogService-2, groupId=LogService] Adding newly assigned partitions: SOCIAL_NETWORKING_SUBJECT_LIKES_POST-0, SOCIAL_NETWORKING_ACTIONS_USERS-0, SOCIAL_NETWORKING_ACTIONS_USERS-1, SOCIAL_NETWORKING_SUBJECT_LIKES_POST-1
Pronto, um dos executores capturou as partições 0 e 1 e outro apenas a partição 2.
Agora o próximo passo é executar diversas vezes a classe “ProducerLikeService”.
Observe bem os logs dos dois executores e ainda perceberá mais uma falha.
Logs gerado por um dos executores
CONSUMER LOG — Records found: 1
Processing likes
Key: key:85498
Value: value:likeId=90899, postId=2928928, userOwnerPost=9142, datePost=01/20/2020T10:10:99 userLikePost=29045, datePost=01/20/2020T12:10:99
Partition :0
Offset: 12
CONSUMER LOG — Records found: 1
Processing likes
Key: key:85498
Value: value:likeId=90899, postId=2928928, userOwnerPost=9142, datePost=01/20/2020T10:10:99 userLikePost=29045, datePost=01/20/2020T12:10:99
Partition :0
Offset: 13
CONSUMER LOG — Records found: 1
Processing likes
Key: key:85498
Value: value:likeId=90899, postId=2928928, userOwnerPost=9142, datePost=01/20/2020T10:10:99 userLikePost=29045, datePost=01/20/2020T12:10:99
Partition :0
Offset: 14
CONSUMER LOG — Records found: 1
Processing likes
Key: key:85498
Value: value:likeId=90899, postId=2928928, userOwnerPost=9142, datePost=01/20/2020T10:10:99 userLikePost=29045, datePost=01/20/2020T12:10:99
Partition :0
Offset: 15
Por este log podemos perceber que as mensagens estão sendo gravadas sempre na mesma partição. Um conceito importante a saber é que o Kafka usa a partir da informação de chave qual é a partição será usada para armazenar. Portanto, precisamos corrigir a classe “ProducerLikeService” para produzir chaves únicas. Podemos usar a classe java.util.UUID para gerar nossas keys.
String keyLike = UUID.randomUUID().toString();
String keyActionUser = UUID.randomUUID().toString();
Pronto, o ideal agora é pegar toda a instrução contida em “ProducerLikeService” e englobar um for com algumas repetições. Vai possibilitar uma geração mais ampla de registros e também para analisar de resolvemos o problema da chave.
for (int i = 0; i < 20; i ++) {
//code ProducerLikeService}
Agora, executando ProducerLikeService será possível ver que as partições estão sendo utilizadas:
CONSUMER LOG — Records found: 1
Processing likes
Key: 32d40c90–9299–473b-87e3-eefcd11e26c4
Value: value:likeId=90899, postId=2928928, userOwnerPost=9142, datePost=01/20/2020T10:10:99 userLikePost=29045, datePost=01/20/2020T12:10:99
Partition :0
Offset: 26
CONSUMER LOG — Records found: 1
Processing likes
Key: a168b63a-ac52–4e79–9c5d-77dc8f410834
Value: value:likeId=90899, postId=2928928, userOwnerPost=9142, datePost=01/20/2020T10:10:99 userLikePost=29045, datePost=01/20/2020T12:10:99
Partition :1
Offset: 8
CONSUMER LOG — Records found: 1
Processing likes
Key: 71af1ad7-e735–4eeb-af32-bc3350babe33
Value: value:postId=2928928, userId=9142, date=01/20/2020 10:10:20
Partition :0
Offset: 21
CONSUMER LOG — Records found: 1
Processing likes
Key: 44868895–5fa0–43f9–9c69-a1d137607205
Value: value:likeId=90899, postId=2928928, userOwnerPost=9142, datePost=01/20/2020T10:10:99 userLikePost=29045, datePost=01/20/2020T12:10:99
Partition :1
Offset: 9
Pronto!
Alguns pontos-chave
- Para paralelizar tarefas é importante configurar mais de uma partição: Neste caso temos dois tópicos divididos cada um em 3 partições sendo consumidos por dois executores;
- Não existe disputa entre mensagens, cada um desses executores fica responsável por determinadas partições quando são iniciados; Vale lembrar que existe re-balanceamento mas não se preocupe ainda com isso;
- Quando a mensagem chega ela é armazenada em algum partição de acordo com sua chave; Portanto, certifique-se de utilizar bons algortimos para gerar as chaves que identificaram o registro;
Na próxima postagem falaremos sobre auto-commit.
Críticas e sugestões são sempre bem-vindas!