Confluent ❤️ SQL Server
Um guia prático e direto ao ponto utilizando a plataforma Confluent para ingestão de dados do SQL Server para o Apache Kafka
O Apache Kafka oferece um rico ecossistema de APIs para se trabalhar, o Kafka Connect é um framework desenvolvido para se conectar com a mais utilizadas fontes de dados como:
- Banco de Dados Relacionais [SQL Server, Oracle, PostgreSQL, MySQL]
- Banco de Dados NoSQL [MongoDB, Cassandra]
- Banco de Dados Grafo [Neo4J]
- Sistemas de Arquivos Distribuídos [HDFS, Amazon S3, Google Cloud Storage, Azure Blob Storage]
- Armazenamentos em Chave/Valor [Redis Cache]
- Índices para Pesquisa [ElasticSearch]
- Data Warehouse [Amazon Redshift, Google BigQuery, Azure SQL Dw]
E a lista não termina por aqui, veja todas as opções disponíveis no site da Confluent.
O Kafka Connect possui duas opções como visualizado na imagem acima:
- Connect Source API: realiza a ingestão dos dados de fontes externas e traz os dados para dentro de tópicos [topic] no Kafka.
- Connect Sink API: responsável por enviar dados dos tópicos do Apache Kafka para diversos sistemas
Olhando um pouco atentamente fica claro ver que o Kafka Connect possibilita com que você traga os dados de toda sua organização para um local centralizado para que você possa agrupar, organizar e analisar e após isso, seu dado curado pode ser enviado para diversos sistemas, chamamos isso de ETL em Real-Time.
Kafka Connect JDBC
Nesse artigo iremos falar sobre o conector JDBC que além de maduro é Confluent Supported o que significa que existe uma ótima documentação e que a Confluent pode oferecer suporte caso você possua algum tipo de suporte da empresa.
Esse conector oferece tanto Source quanto Sink o que significa que você pode tanto conectar em um banco de dados relacional e puxar os dados para dentro de um tópico como também você pode enviar os dados de um tópico para um banco de dados.
JDBC Source Connector
O Source Connector API possui opções de configuração, mas a que mais se destaca é a forma de captura dos dados na fonte, que são:
- Incrementing Column
- Timestamp Column
- Timestamp & Incrementing Columns
- Custom Query
- Bulk
Instalando e Configurando Confluent com SQL Server
- Garanta que a instalação do Confluent Platform está funcionando corretamente.
1. Download do JDBC Driver do SQL Server
após realizar o download, copie o .jar para a pasta /home/luanmoreno/confluent-5.3.1/share/java/kafka-connect-jdbc/ trocando o usuário da sua máquina linux.
/home/luanmoreno
curl -O https://download.microsoft.com/download/6/9/9/699205CA-F1F1-4DE9-9335-18546C5C8CBD/sqljdbc_7.4.1.0_enu.tar.gz
tar xzf sqljdbc_7.4.1.0_enu.tar.gz
rm -rf sqljdbc_7.4.1.0_enu.tar.gz
/home/luanmoreno/sqljdbc_7.4/enu
mssql-jdbc-7.4.1.jre8.jar
cp mssql-jdbc-7.4.1.jre8.jar /home/luanmoreno/confluent-5.3.1/share/java/kafka-connect-jdbc/
/home/luanmoreno/confluent-5.3.1/share/java/kafka-connect-jdbc/
mssql-jdbc-7.4.1.jre8.jar
2. Criar Arquivo de Configuração no Local Correto
criar o arquivo de conexão para o SQL Server vindo da pasta /confluent-5.3.1/etc/kafka-connect-jdbc
nesse caso, estou utilizando o modo = incrementing que irá ter uma chave primária na tabela, e toda vez que houver um insert/update o Kafka Connect irá buscar essa informação em real-time.
um outro ponto a destacar é que todo o schema da tabela estará sendo salvo no schema registry o que irá possibilitar com que você não precise especificar schema além de guardar as versões possibilitando schema evolution. para garantir que não haja erro recomendo reiniciar sua máquina virtual
/home/luanmoreno/confluent-5.3.1/etc/kafka-connect-jdbc
file name = source-mssql-sales-order-detail.properties
name=source-mssql-sales-order-detail
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
connection.url=jdbc:sqlserver://****;database=****;username=****;password=****;
connection.attempts=2
query=SELECT * FROM dbo.SalesOrderDetail
mode=incrementing
topic.prefix=mssql-sales-order-detail
incrementing.column.name=SalesOrderDetailID
tasks.max=2
validate.non.null=false
3. Inicialização do Conector com SQL Server
com o arquivo criado no diretório correto, agora é necessário que você carregue o mesmo para iniciar o processo de captura dos dados, lembrando que aqui será feito um bulk-load inicialmente e após isso todos os próximos dados serão capturados como incrementais.
confluent local start
confluent local statusconfluent local load mssql-sales-order-detail -- -d /home/luanmoreno/confluent-5.3.1/etc/kafka-connect-jdbc/source-mssql-sales-order-detail.properties
confluent local status mssql-sales-order-detail
confluent local connect status
4. Verificando Tópico
com o connector configurado, agora você pode listar o tópico.
kafka-topics --list --zookeeper localhost:2181
kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--property print.key=true \
--from-beginning \
--topic mssql-sales-order-detail
kafka-topics --describe --zookeeper localhost:2181 --topic mssql-sales-order-detail
Espero que tenha gostado, se tiver alguma dúvida poste aqui, terei o maior prazer em ajudar!