Como usar o Kafka JDBC Sink Connector

O Kafka é uma plataforma de Stream de Eventos que conta com Kafka Connects, dentre outras ferramentas, para auxiliar o input e output de dados. Um output comum pode ser consumir as mensagens de um tópico para um banco de dados e é sobre isso que vamos falar.

Kakfa JDBC Sink Connector

O Kafka JDBC Sink Connector permite você exportar dados apartir de um tópico do Kafka diretamente para qualquer banco de dados relacional com um driver JDBC. É possível configurar multiplos JDBC Sink Connector e exportar dados para mais de um banco de dados relacional ao mesmo tempo, com apenas um serviço de Kafka Connect rodando. Outras features como auto-creation e até mesmo auto-evolution de tabelas também são possíveis.

How it works?

Kafka Connect, Neha Narkhede. February 18, 2016.

O JDBC Sink Connector funciona como um Kafka Consumer, consumindo as mensagens de um Tópico existente do Kafka e exportando diretamente para o banco de dados. Para isso, ele utiliza o Kafka Connect, uma ferramenta que permite de forma escalável e confiável o stream de dados entre o Apache Kafka e outras aplicações.

Talk is cheap. Show me the code!

Para estes exemplos vamos utilizar o Kafka mantido pela Confluent, que já possui suporte ao Kafka Sink Connect. Nosso objetivo é consumir as mensagens de tópico em AVRO e exportar para um banco de dados MySQL.

Criando o tópico no Kafka.

Nesta etapa presumo que o Zookeeper-server, Kafka-server e Schema-registry já estejam rodando. Você pode conferir como fazer isso em Confluent Platform Quickstart.

Vamos criar um tópico “balance” onde receberemos o saldo de uma conta.

bin/kafka-topics --create --zookeeper localhost:2181 -topic balance

Subindo o serviço do Connector.

Como comentei, o JDBC Sink Connector funciona com a grande maioria dos bancos de dados relacionais, mas para isso é necessário possuir o JDBC driver do banco que deseja gravar. Vamos usar o MySQL e você pode achar o driver aqui.

Como boa prática costumo mover o driver para uma pasta “drivers” na raiz do Kafka.

confluent-3.1.1/drivers

Em seguida faça o download do Java MySQL Connector e export do driver para usar junto com o JDBC Sink Connector.

$ wget http://central.maven.org/maven2/mysql/mysql-connector-java/5.1.6/mysql-connector-java-5.1.6.jar
$ export CLASSPATH=./confluent-3.1.1/drivers/mysql-connector-java-5.1.6.jar

O Apache Kafka da Confluent já conta com o serviço de connectors. Podemos encontrar o serviço em “confluent-3.1.1/bin/connect-distributed” e as suas configurações do serviço em “confluent-3.1.1/etc/schema-registry/connect-avro-distributed.properties”. Vamos subir o kafka connect-distributed service com suas configurações default.

nohup bin/connect-distributed etc/schema-registry/connect-avro-distributed.properties > jdbc-sink-connect.log &

Configurando o JDBC Sink Connector.

Uma vez criado o tópico e tendo nosso Connector rodando podemos configurar o nosso JDBC Sink Connector para gravar as mensagens do tópico em um banco de dados MySQL normal. Para esta etapa será ter necessário ter o MySQL rodando com ao menos um database criado.

Para criarmos nosso Sink Connector basta fazer um POST na porta que esta rodando o Connect com o JSON de configuração:

curl -XPOST --header "Content-Type: application/json" localhost:8083/connectors -d '{  
"name": "sink",
"config": {
"name": "sink",
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": 1,
"connection.url": "jdbc:mysql://localhost:3306/test",
"connection.user": "root",
"connection.password": "root",
"topics": "balance",
"auto.create": "true"
}
}'

Você pode verificar uma lista completa de todas as configurações disponíveis no site do Confluent. Vamos falar um pouco sobre as confiugrações utilizadas no connector acima:

  • connection.url: URL do banco de dados onde o Sink irá exportar os dados.
  • connection.user: Usuário do banco de dados que será o utilizado pelo Sink.
  • connection.password: Senha do usuário do banco de dados que será o utilizado pelo Sink.
  • topics: Você pode informar um ou mais tópicos para o Sink connector ouvir e exportar os dados o banco de dados. Por default o Sink irá gravar na tabelas com mesmo nomes dos tópicos.
  • auto.create: Quando habilitado permite a criação automática da tabela no banco de dados (O usuário deve ter permissão para criação).
  • tasks.max: Podemos entender o número de tasks ativas como o numero de consumers ativos para o tópico no mesmo consumer group.

Magic happens!

Tudo pronto, hora da mágica! Para testarmos nosso ambiente basta produzir mensagens no tópico que criamos anteriormente, o mesmo que o Sink está configurado. Um ponto importante é que não é possível gravar no Sink sem informar o Schema da sua mensagem. Neste caso vamos usar AVRO para definir o schema das mensagens, que é recomendo pelo próprio Kafka. É possível subir um producer diretamente na sua máquina utilizando o “kafka-avro-console-producer”.

$ bin/kafka-avro-console-producer \ 
--broker-list localhost:9092 --topic balance \
--property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"id","type":"int"},{"name":"account_name","type":"String"}, {"name":"current", "type": "int"}, {"name":"future", "type": "int"}]}'

Logo em seguida, tudo que digitar no terminal será uma mensagem para o tópico. Como já configuramos o connector, toda mensagem que entrar no tópico também será automaticamente gravada na tabela.

{"id": 1, "account": "Joao", "current": 100, "future": 100}
{"id": 2, "account": "Pedro", "current": 50, "future": 250}
{"id": 3, "account": "Jose", "current": 300, "future": 10}

Neste exemplo seguimos o fluxo mais simples, onde todas mensagens serão inserts no banco. É possível configurar o JDBC Sink Connect para realizar updates também, a partir da key da própria mensagem ou alguma informação entro do value da mensagem. Você pode checar mais sobre modos de escrita em writes (insert.mode) ou sobre mapeamento de chaves em data mapping (pk.mode).

Streams for life!

Gostou do Kafka ou quer entender mais sobre o assunto? Junte-se ao grupo no Slack da Confluent (Uma das principais mantenedoras do Apache Kafka) e ajude desenvolver novas soluções! :)

Referências