Kafka no k8s [Strimzi] Zero to Hero round 2

Mateus Oliveira
Engenharia de Dados Academy
5 min readApr 27, 2021

--

No artigo anterior, era o início da caminhada ao pé da montanha chamada Kafka no Kubernetes, preparamos o nosso ambiente e agora vamos começar a escalada, agora é o momento de trabalharmos com ELT e ETL.

Alguns pontos que iremos ver nessa edição:

  • Build de Image do Kafka Connect
  • Configuração e implantação do arquivo yaml do Kafka Connect
  • Configuração de connector JDBC
  • Conexão com um banco de dados Postgresql e SQL Server

Podemos ver que temos muitas coisas nesse round então mão a obra, primeiro vamos falar sobre a imagem, o build de image e feito da seguinte forma, na documentação do Strimzi temos um template, lembrando que imagens em containers devemos sempre levar em consideração em não ter imagens grandes então nunca faça um build com todos os conectores existentes, somente os que vai usar.

Vamos criar para o conector JDBC, importante baixar o jars para Postgresql e SQL Server, segue os links abaixo:

PostgreSQL

MSSQL

Abaixo está o Dockerfile, dockerfile é o arquivo base para criação da imagem:

# kafka version = 2.7 [latest]# https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc# get image from strimzi repository
# https://quay.io/repository/strimzi/kafka
FROM quay.io/strimzi/kafka:latest-kafka-2.7.0
# using root user
USER root:root
# create dirs
RUN mkdir -p /opt/kafka/plugins/kafka-connect-jdbc

# copy jar files
COPY ./jars/kafka-connect-jdbc/ /opt/kafka/plugins/kafka-connect-jdbc/
# user
USER 1001

Depois de criado nosso dockerfile, vamos agora buildar a imagem dentro do nosso docker local, lembrando que precisamos ter uma conta no docker hub para hospedar a nossa imagem do kafka connect:

Agora vamos aos comandos para construir a nossa imagem, antes de começarmos alguns pontos:

  • Só será necessário fazer mais builds de imagem em caso de adição de novos conectores, exemplo CDC Debezium SQL Server.
  • É possível substituir o nome da imagem para qualquer nome
  • YOURdockerRepository = o seu repositório no docker hub, então antes de realizar os comandos abaixo rodar o comando: docker login e logar no seu docker hub.
# pull latest image version
docker pull quay.io/strimzi/kafka:latest-kafka-2.7.0
# verify local images
docker images
# build image
docker build . -t our-image-kafka-connect-strimzi:2.7
# build latest image
docker build -f Dockerfile.latest --tag our-image-kafka-connect-strimzi:2.7 .
# access image
docker run -i -t our-image-kafka-connect-strimzi:2.7 /bin/bash
/opt/kafka/plugins/
# docker hub [repositories]
https://hub.docker.com/repositories
# tag image
docker tag our-image-kafka-connect-strimzi:2.7 YOURdockerRepository/our-image-kafka-connect-strimzi:2.7
# push image to registry
docker push YOURdockerRepository/our-image-kafka-connect-strimzi:2.7

Com isso vamos publicar a nossa imagem está publicada no docker hub e pronta para ser utilizada, agora vamos ao arquivo yaml do kafka connect, lembrando que temos um template dentro do github strimzi:

examples/connect/kafka-connect.yaml

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
# annotations:
# # use-connector-resources configures this KafkaConnect
# # to use KafkaConnector resources to avoid
# # needing to call the Connect REST API directly
# strimzi.io/use-connector-resources: "true"
spec:
image: YOURdockerRepository/our-image-kafka-connect-strimzi:2.7
imagePullPolicy: Always
version: 2.7.0
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9093
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status

Única alteração que vamos fazer neste caso é incluir a nossa nova imagem no arquivo yaml, dessa forma podemos criar nossos conectores JDBC para os bancos PostgreSQL e SQL Server

Agora vamos criar o nosso conector JDBC Source, em se tratando de kubernetes, vamos criar um arquivo yaml, como melhores práticas vamos nomear ele como ingest-src-nome-da-tabela-avro.yaml.

apiVersion: "kafka.strimzi.io/v1alpha1"
kind: "KafkaConnector"
metadata:
# connector name
name: "ingest-src-sqlserver-credit-card-json-d91e5106"
labels:
# kafka connect [cluster] name
strimzi.io/cluster: edh
spec:
class: io.confluent.connect.jdbc.JdbcSourceConnector
tasksMax: 2
config:
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: true
value.converter.schemas.enable: true
connection.url: "jdbc:sqlserver://127.0.0.1:1433;databaseName=DB"
connection.user: "kafka"
connection.password: "kafka"
connection.attempts: "2"
query: "SELECT * FROM dbo.credit_card"
mode: "incrementing"
topic.prefix: "src-sqlserver-credit-card-json"
incrementing.column.name: "id"
validate.non.null: "false"
transforms: "createKey,extractInt,InsertTopic,InsertSourceDetails"
transforms.createKey.type: "org.apache.kafka.connect.transforms.ValueToKey"
transforms.createKey.fields: "id"
transforms.extractInt.type: "org.apache.kafka.connect.transforms.ExtractField$Key"
transforms.extractInt.field: "id"
transforms.InsertTopic.type: "org.apache.kafka.connect.transforms.InsertField$Value"
transforms.InsertTopic.topic.field: "messagetopic"
transforms.InsertSourceDetails.type: "org.apache.kafka.connect.transforms.InsertField$Value"
transforms.InsertSourceDetails.static.field: "messagesource"
transforms.InsertSourceDetails.static.value: "mssql"

Agora vamos rapidamente dissecar esse arquivo:

apiVersion: "kafka.strimzi.io/v1alpha1"
kind: "KafkaConnector"
metadata:
# connector name
name: "ingest-src-sqlserver-credit-card-json-d91e5106"
labels:
# kafka connect [cluster] name
strimzi.io/cluster: edh
spec:
class: io.confluent.connect.jdbc.JdbcSourceConnector
tasksMax: 2

Nessa primeira parte temos o tipo de recurso dentro do kubernetes kind:"kafkaConnector" aqui que o Strimzi mostra as características de um operador, ele monitora o cluster e administra os kinds do kafka, temos o nome do conector, lembrando que o ideia e criar um UUID no final do nome para caso seja necessários redeployment, a quantidade de tasks para um JDBC depende de quantas partições iremos usar e qual será a velocidade de gravação, lembrando que isso também irá influenciar no overhead em cima do banco de dados, e o mais importante é a classe, essa a configuração de qual é o tipo de conector, lembram-se eu tenho Source = extração de dados, e o Sink = output dos dados do kafka para uma fonte externa, e mesmo sendo do conectores JDBC são classes diferentes, isso vai ficar mais claro ao decorrer da serie.

config:
key.converter: org.apache.kafka.connect.json.JsonConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable: true
value.converter.schemas.enable: true
connection.url:"jdbc:sqlserver://127.0.0.1:1433;databaseName=DB"
connection.user: "kafka"
connection.password: "kafka"
connection.attempts: "2"
query: "SELECT * FROM dbo.credit_card"
mode: "incrementing"
topic.prefix: "src-sqlserver-credit-card-json"
incrementing.column.name: "id"
validate.non.null: "false"

Dados de configuração, aqui estamos gravando os dados como Json, lembrando que o Kafka trabalha com chave + valor, a url de conexão com o banco de dados, neste caso estamos usando um SQL Server, usuário e senha, e é possível encapsular esses dados em secrets, aqui como é um ambiente de teste está exposto, o modo desse conector é incremental ou seja ela vai fazer um load completo na primeira vez, depois será incremental, a coluna tem que ser inteira e número sequencial e por último nome do tópico no kafka.

transforms: "createKey,extractInt,InsertTopic,InsertSourceDetails"   transforms.createKey.type:"org.apache.kafka.connect.transforms.ValueToKey"
transforms.createKey.fields: "id"
transforms.extractInt.type:"org.apache.kafka.connect.transforms.ExtractField$Key"
transforms.extractInt.field: "id"
transforms.InsertTopic.type:"org.apache.kafka.connect.transforms.InsertField$Value"
transforms.InsertTopic.topic.field: "messagetopic"
transforms.InsertSourceDetails.type:"org.apache.kafka.connect.transforms.InsertField$Value"
transforms.InsertSourceDetails.static.field: "messagesource"
transforms.InsertSourceDetails.static.value: "mssql"

Por último mas não menos importante, temos um SMT [Simple Message Transform], aqui podemos fazer algumas transformações simples para preparar os dados durante a ingestão, aqui estamos fazendo 2 importantes transformações, transformando a coluna id em key da mensagem, e inseri um nova coluna chamada messagesource e inserindo o dado mssql, já que nossos dados estão vindo de um banco de dados SQL Server, reforçando que isso acontece durante a ingestão de dados por isso deve ser feito pensando em transformações simples nada complexo pois isso vai ser executado dentro do Kafka Connect e quanto mais complexo maior a necessidade de recursos computacionais.

Segue um link de documentação SMT:

Agora os dados estão entrando dentro do nosso Kafka, nosso primeiro tópico recebendo dados do SQL Server, com um arquivo de configuração.

No próximo round iremos processar os dados dentro do Kafka.

--

--