Mirroring entre datacenters com MirrorMaker — Parte II

iundarigun
Dev Cave
Published in
8 min readMay 6, 2020

Como falei no post anterior, temos um cenário onde vamos precisar duplicar dados de alguns tópicos entre datacenters. Analisamos algumas opções e aprofundamos no MirrorMaker. Vamos relembrar a descrição do MirrorMaker que fizemos no post anterior:

É a solução que oficial do Kafka. A ideia é bem simples: O MirrorMaker usa o consumer padrão do Kafka, pega as mensagens e publica usando o producer padrão do Kafka no mesmo tópico no cluster de destino. Cada “instância” do MirrorMaker só tem um producer, mas pode ter vários consumers.

As principais caraterísticas são:

  • Recebe um parâmetro que é um pattern indicando quais tópicos precisam ter os dados duplicados no outro datacenter.
  • A duplicação acontecer Cluster to Cluster, isso é, consume de um único cluster e produz para um único cluster também. Se precisar duplicar entre mais clusters, vai precisar instâncias do MirrorMaker para cada par de clusters.
  • Como a duplicação acontece no mesmo nome de tópico, numa configuração Active-Active é preciso ter cuidado para evitar a duplicação cíclica, onde o tópico X é replicado do cluster A para o B e do B para o A, pois o consumer não distingue se o evento vem de um datacenter ou de outro.
  • Simples de escalar, pois podemos iniciar vários processos de MirrorMaker para o mesmo ou diferentes tópicos.
  • Se o tópico destino não existe, o mesmo é criado seguindo as configurações padrão do cluster de destino.

Usando MirrorMaker

Temos alguns passos antes de usar o MirrorMaker. Vamos começar pelo básico: Baixar a última versão de Kafka do site oficial (nos exemplos usei a versão 2.5.0)

Após descomprimir, já podemos usar o MirrorMaker, que se encontra na pasta bin do Kafka. Qualquer pesquisa rápida no google, vai te trazer um formato de uso parecido a este:

$ <kafka-folder>/bin/kafka-mirror-maker.sh --consumer.config ./consumer.properties --producer.config ./producer.properties --new.consumer --num.streams=2 --whitelist "*"

Como ponto de partida, vamos começar analisando cada parâmetro

consumer.config

Faz referência ao arquivo de properties onde configuramos o consumer. Temos um arquivo base na pasta config do próprio Kafka.

  • bootstrap.servers: É a lista de brokers de Kafka de onde devemos consumir
  • group.id: É o consumer group que usaremos para o consumir os tópicos. Podemos usar ele para identificar os lags de consumo dos offsets.
  • auto.offset.reset: Configura o comportamento do offset inicial do consumer group caso não existir.

Existem mais propriedades que infelizmente não estão listadas no arquivo de modelo. Iremos descobrir algumas depois.

producer.cofig

Faz referência ao arquivo de properties onde configuramos o producer. Também temos um arquivo base na pasta config.

  • bootstrap.servers: É a lista de brokers de Kafka de onde devemos produzir
  • compression.type: Indica o tipo de compressão. Pode ser interessante comprimir os dados antes de enviar para o cluster de destino se o tamanho da banda de rede for um problema.

O resto de parâmetros são auto explicativos mas, de novo, há certos parâmetros que não estão listado neste arquivo.

new.consumer

Esta propriedade faz referência ao tipo de consumidor usado. Atualmente não se faz mais necessário, pois este parâmetro é legado das primeiras versões.

num.streams

Indica o número de consumers concorrentes que iremos usar. O comportamento é igual que nas aplicações, no máximo podemos ter um consumer por partition. Também precisamos tomar cuidado com o número de consumers pois um número muito elevado pode provocar um gargalho no MirrorMaker, por ter só um producer.

whitelist

Indica o pattern aplicado sobre o nome do tópico. É um regex usado para filtrar aqueles tópicos que precisam duplicar dados entre o datacenter origem e destino.

Rodando o MirrorMaker

Vamos levantar dois clusters na nossa máquina usando o docker-compose que deixei no meu repositório.

$ docker-compose -f docker-compose-cluster1.yml up -d
$ docker-compose -f docker-compose-cluster2.yml up -d

Isso vai levantar um cluster com um único broker no localhost:9094 e outro cluster igual no localhost:9096.

Vamos criar um tópico em cada cluster. Podemos usar a CLI ou podemos usar alguma ferramenta visual, e pessoalmente gosto de usar conduktor, recomendação do Hivison Moura.

$ <kafka folder>/bin/kafka-topics.sh --bootstrap-server localhost:9094 --topic DEVCAVE.EXAMPLE.MIRRORMAKER --create --partitions 12 --replication-factor 1
$ <kafka folder>/bin/kafka-topics.sh --bootstrap-server localhost:9096 --topic DEVCAVE.EXAMPLE.MIRRORMAKER --create --partitions 12 --replication-factor 1

Criamos um tópico chamado DEVCAVE.EXAMPLE.MIRRORMAKER em cada cluster, com 12 partitions.

Agora, vamos alterar o consumer.properties e o producer.properties para apontar para esses clusters. E vamos iniciar o MirrorMaker:

Produzimos uma mensagem no cluster 1:

$ <kafka folder>/bin/kafka-console-producer.sh --broker-list localhost:9094 --topic DEVCAVE.EXAMPLE.MIRRORMAKER
> message 1

Se verificamos no conduktor, veremos que temos uma mensagem no tópico do primeiro cluster e também no segundo cluster.

Já temos o MirrorMaker funcionando de uma forma mais simples do que pareceu no começo!

Arquitetura Active-Active

Quando precisamos duplicar os dados entre datacenters de maneira bidirecional, isso é, do tópico DEVCAVE.EXAMPLE.MIRRORMAKER no datacenter 1 para o datacenter 2 e vice-versa, temos uma referência cíclica e os dados são duplicados eternamente entre os clusters. Uma solução simples é renomear os tópicos para DEVCAVE.EXAMPLE.MIRRORMAKER.DC1 e DEVCAVE.EXAMPLE.MIRRORMAKER.DC2, então teremos 2 tópicos por datacenter. As aplicações do datacenter 1 vão produzir para os tópicos *.DC1 e consumir dos dois, e as aplicações do datacenter 2 vão produzir para os tópicos *.DC2 e consumir nos dois. É uma solução simples, mas com certeza aumenta a complexidade de configuração de nossas aplicações. Mas para frente veremos um jeito de melhorar isso.

Configurações (um pouco mais) avançadas

A primeira configuração que podemos ajustar é até simples. Quando rodamos a primeira vez, já apareceu uma sugestão para configurar o a estrategia de partition assignment. Então vamos adicionar a property sugerida no consumer.properties. Também vamos aproveitar para deixar explicito a politica de reset offset e o nome do consumer group:

Como falei anteriormente, podemos alterar o producer.properties para comprimir o dado antes de enviá-lo, evitando assim uso desnecessário de rede. Por contra, esse processo vai consumir mais tempo e CPU, então avalie o que faz mais sentido.

Também vamos adicionar uma propriedade “perigosa”: o max.in.flight.requests.per.connection, que se refere ao número máximo de requests que podemos realizar sem receber confirmação de recebimento. O perigo dessa configuração é referente à ordem das mensagens. Se for importante manter a ordem, precisamos deixar esse valor como default, 1, pois qualquer valor superior não vai garantir a ordem caso precisar retentar a entrega de alguma mensagem.

Que outras propriedades podemos configurar? No geral, qualquer uma que os consumers/producers do Kafka aceitem. Podem ver em:

Não todas as propriedades podem ser usadas. Por exemplo, qualquer uma relacionada com ack no consumer será desconsiderada. Recomendo dar uma olhada no código do Kafka (lembrando que é opensource), começando pela classe do MirrorMaker (Sim, é Scala, mas não é difícil de entender).

Message Handler

Uma coisa recomendável, e que ainda não fizemos, é olhar o help do MirrorMaker:

Tem duas que chamam a atenção:

  • offset.commit.interval.ms: Indica o tempo de commit do offset (pois não é mais automático). Por padrão é de 60 segundos e me fez perder os cabelos até que reparei nele porque o lag do consumer group ficava relativamente grande e estável.
  • message.handler e message.handler.args: Significa que podemos fazer algum tipo de manipulação nas mensagens antes de enviá-las.

O Message Handler pode nos auxiliar para resolver de outra forma o problema do nome dos tópicos na arquitetura Active-Active. Aliás, podemos simplesmente renomear o tópico de destino como é feito neste repositório: https://github.com/slavirok/mirrormaker-rename-topic-handler.

Neste caso, o message.handler.args é um mapa entre o tópico origem e o tópico destino. A principal vantagem é que todas as aplicações produzem no mesmo nome de tópico, seguindo o exemplo, no DEVCAVE.EXAMPLE.MIRRORMAKER, estejam no datacenter1 ou no datacenter2, e são duplicados para o tópico DEVCAVE.EXAMPLE.MIRRORMAKER.ALTERNATIVE. O whitelist usado no MirrorMaker é o mesmo também, alternando só o consumer.properties e o producer.properties para inverter a origem e o destino. As únicas aplicações que precisam ser mudadas são as aplicações que consomem, pois ainda precisam consumir de dois tópicos.

Mas, voltando ao Message Handler, é difícil de fazer? Como funciona? Vamos ver um snippet do código do repositório mencionado:

Só precisamos implementar a interface MirrorMakerMessageHandler e transformar o objeto consumido num ProducerRecord, alterando o necessário.

Olhando com calma, percebemos que podemos usar os headers para identificar a origem da mensagem. Quando formos duplicar a mensagem, podemos adicionar um header de datacenter de origem. Caso o header exista, descartamos a mensagem. Desta forma, podemos usar um único nome de tópico em todos os datacenters, tanto para produzir como para consumir:

Desta vez coloquei o código em Kotlin ❤️

Só um ponto de alerta sobre esta abordagem: A mesma mensagem será consumida pelo MirrorMaker que duplica do datacenter 1 para o datacenter 2 e consumida de novo pelo MirrorMaker que duplica no caminho contrário, que é onde será descartada. O consumo de mensagens vai aumentar, então avalie bem se compensa aumentar o volume de dados trafegados para reduzir a complexidade da sua aplicação.

Docker ao resgate

Um dos “problemas” que tem o MirrorMaker é que tem só um producer para N consumers. Isso pode virar um gargalho. Podemos, porém, executar vários MirrorMakers com o mesmo consumer group e o balanceamento dos consumers do Kafka vai se virar para distribuir a carga.

Uma estratégia bastante usada é dockerizar sua aplicação. Segue um exemplo que podemos encontrar no meu repositório:

Basicamente, baixamos o Kafka, mandamos para dentro da imagem os arquivos properties, configuramos variáveis que podemos sobrescrever quando fazer o run e copiamos também um exec que faz o export da lib do message-handler e inicia o MirrorMaker. Neste caso estamos usando o message-handler do slavirok apresentado acima. Segue o conteúdo do exec:

#!/bin/bashexport CLASSPATH=/opt/message-handler.jar/opt/kafka/bin/kafka-mirror-maker.sh --consumer.config /opt/mirror-maker/consumer.properties --producer.config /opt/mirror-maker/producer.properties --new.consumer --num.streams=$NUM_STREAMS --whitelist "$WHITELIST" --offset.commit.interval.ms=$COMMIT_INTERVAL --message.handler com.slavirok.RenameTopicHandler --message.handler.args "$HANDLER_ARGS"

Para rodar, precisamos primeiro buildar a imagem e rodar o contêiner.

$ docker build -t iundarigun/mirror-maker .
$ docker run --network host --name=mirror-maker -d iundarigun/mirror-maker

Dois detalhes rápidos:

  • Primeiro é que rodamos o contêiner com network host porque precisamos enxergar o localhost. Provavelmente não precise disso em produção.
  • Segundo é que deixamos a configuração dos brokers nos arquivos e colocamos ele no build da imagem. Podemos compartilhar volumes na hora de fazer o run da imagem para sobrescrever ou melhorar a imagem para poder receber por variáveis de ambiente.

Agora podemos rodar tantos contêiners como precisar, em função da necessidade.

Como último recado, é recomendável rodar o MirrorMaker no datacenter do cluster de destino, consumindo dados cross-datacenter, Um consumer que no consegue conectar no cluster é mais seguro que não um producer que não consegue conectar, desde o ponto de vista de perder eventos.

Conclusão

O MirrorMaker é uma ferramenta simples e relativamente parametrizável que provavelmente atende a maioria dos casos de uso. E vocês, tiveram alguma experiência boa com MirrorMaker? Tiveram que partir para outras soluções? Conta pra nós!

Referências

--

--

iundarigun
Dev Cave

Java and Kotlin software engineer at Clearpay