Confluent ❤️ SQL Server

Luan Moreno M. Maciel
Engenharia de Dados Academy
4 min readJan 6, 2020

Um guia prático e direto ao ponto utilizando a plataforma Confluent para ingestão de dados do SQL Server para o Apache Kafka

O Apache Kafka oferece um rico ecossistema de APIs para se trabalhar, o Kafka Connect é um framework desenvolvido para se conectar com a mais utilizadas fontes de dados como:

  1. Banco de Dados Relacionais [SQL Server, Oracle, PostgreSQL, MySQL]
  2. Banco de Dados NoSQL [MongoDB, Cassandra]
  3. Banco de Dados Grafo [Neo4J]
  4. Sistemas de Arquivos Distribuídos [HDFS, Amazon S3, Google Cloud Storage, Azure Blob Storage]
  5. Armazenamentos em Chave/Valor [Redis Cache]
  6. Índices para Pesquisa [ElasticSearch]
  7. Data Warehouse [Amazon Redshift, Google BigQuery, Azure SQL Dw]

E a lista não termina por aqui, veja todas as opções disponíveis no site da Confluent.

Kafka Connect Source & Sink — https://docs.confluent.io/current/connect/index.html

O Kafka Connect possui duas opções como visualizado na imagem acima:

  • Connect Source API: realiza a ingestão dos dados de fontes externas e traz os dados para dentro de tópicos [topic] no Kafka.
  • Connect Sink API: responsável por enviar dados dos tópicos do Apache Kafka para diversos sistemas

Olhando um pouco atentamente fica claro ver que o Kafka Connect possibilita com que você traga os dados de toda sua organização para um local centralizado para que você possa agrupar, organizar e analisar e após isso, seu dado curado pode ser enviado para diversos sistemas, chamamos isso de ETL em Real-Time.

Kafka Connect JDBC

Kafka Connect JDBC — https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc

Nesse artigo iremos falar sobre o conector JDBC que além de maduro é Confluent Supported o que significa que existe uma ótima documentação e que a Confluent pode oferecer suporte caso você possua algum tipo de suporte da empresa.

Esse conector oferece tanto Source quanto Sink o que significa que você pode tanto conectar em um banco de dados relacional e puxar os dados para dentro de um tópico como também você pode enviar os dados de um tópico para um banco de dados.

JDBC Source Connector

O Source Connector API possui opções de configuração, mas a que mais se destaca é a forma de captura dos dados na fonte, que são:

  • Incrementing Column
  • Timestamp Column
  • Timestamp & Incrementing Columns
  • Custom Query
  • Bulk

Instalando e Configurando Confluent com SQL Server

  • Garanta que a instalação do Confluent Platform está funcionando corretamente.

1. Download do JDBC Driver do SQL Server

após realizar o download, copie o .jar para a pasta /home/luanmoreno/confluent-5.3.1/share/java/kafka-connect-jdbc/ trocando o usuário da sua máquina linux.

/home/luanmoreno

curl -O https://download.microsoft.com/download/6/9/9/699205CA-F1F1-4DE9-9335-18546C5C8CBD/sqljdbc_7.4.1.0_enu.tar.gz

tar xzf sqljdbc_7.4.1.0_enu.tar.gz
rm -rf sqljdbc_7.4.1.0_enu.tar.gz

/home/luanmoreno/sqljdbc_7.4/enu
mssql-jdbc-7.4.1.jre8.jar

cp mssql-jdbc-7.4.1.jre8.jar /home/luanmoreno/confluent-5.3.1/share/java/kafka-connect-jdbc/

/home/luanmoreno/confluent-5.3.1/share/java/kafka-connect-jdbc/
mssql-jdbc-7.4.1.jre8.jar

2. Criar Arquivo de Configuração no Local Correto

criar o arquivo de conexão para o SQL Server vindo da pasta /confluent-5.3.1/etc/kafka-connect-jdbc

nesse caso, estou utilizando o modo = incrementing que irá ter uma chave primária na tabela, e toda vez que houver um insert/update o Kafka Connect irá buscar essa informação em real-time.

um outro ponto a destacar é que todo o schema da tabela estará sendo salvo no schema registry o que irá possibilitar com que você não precise especificar schema além de guardar as versões possibilitando schema evolution. para garantir que não haja erro recomendo reiniciar sua máquina virtual

/home/luanmoreno/confluent-5.3.1/etc/kafka-connect-jdbc


file name = source-mssql-sales-order-detail.properties

name=source-mssql-sales-order-detail
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
connection.url=jdbc:sqlserver://****;database=****;username=****;password=****;
connection.attempts=2
query=SELECT * FROM dbo.SalesOrderDetail
mode=incrementing
topic.prefix=mssql-sales-order-detail
incrementing.column.name=SalesOrderDetailID
tasks.max=2
validate.non.null=false

3. Inicialização do Conector com SQL Server

com o arquivo criado no diretório correto, agora é necessário que você carregue o mesmo para iniciar o processo de captura dos dados, lembrando que aqui será feito um bulk-load inicialmente e após isso todos os próximos dados serão capturados como incrementais.

confluent local start
confluent local status
confluent local load mssql-sales-order-detail -- -d /home/luanmoreno/confluent-5.3.1/etc/kafka-connect-jdbc/source-mssql-sales-order-detail.properties

confluent local status mssql-sales-order-detail
confluent local connect status

4. Verificando Tópico

com o connector configurado, agora você pode listar o tópico.

kafka-topics --list --zookeeper localhost:2181

kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--property schema.registry.url=http://localhost:8081 \
--property print.key=true \
--from-beginning \
--topic mssql-sales-order-detail

kafka-topics --describe --zookeeper localhost:2181 --topic mssql-sales-order-detail

Espero que tenha gostado, se tiver alguma dúvida poste aqui, terei o maior prazer em ajudar!

--

--