Kafka no k8s [Strimzi] Zero to Hero round 3

Mateus Oliveira
Engenharia de Dados Academy
5 min readMay 14, 2021

--

Agora que já estamos no começo da caminhada, vamos para próxima etapa, depois de extrair os dados que estavam em banco de dados SQL Server, vamos processar esses dados, mas antes vamos as considerações iniciais:

  • Importante ter aplicado os conhecimentos do Round 1 e 2 antes de iniciar esse, são complementarem e sequenciais.
  • Vamos realizar o deployment de mais uma tecnologia.
  • Não existe o certo, existe o que atenderá a sua necessidade.

Dito isso vamos para a seguinte questão, o que iremos implantar para realizar o processamento? neste caso vamos usar o KSQLDB, para quem não sabe, o KSQLDB é uma engine de processamento que usa a linguagem SQL projetada para trabalhar com o Apache Kafka, neste caso é especifico e não funciona com outras tecnologias de streaming okay.

Esse vai ser o presente dessa serie, se vocês pesquisarem na internet não vai ser algo trivial instalação do KSQLDB, não tem um Helm chart especifico para ele, sendo assim vou compartilhar que foi desenvolvido internamente com vocês, então vamos fazer por parte, primeiro vocês vão criar os arquivos yaml, começando com o de deployment:

apiVersion: apps/v1
kind: Deployment
metadata:
name: ksqldb-server
labels:
app: ksqldb-server
spec:
replicas: 1
selector:
matchLabels:
app: ksqldb-server
template:
metadata:
labels:
app: ksqldb-server
spec:
containers:
- name: ksqldb-server
image: confluentinc/ksqldb-server:0.12.0
ports:
- containerPort: 8088
env:
- name: KSQL_BOOTSTRAP_SERVERS
value: PLAINTEXT://kafka.ingestion.svc.Cluster.local:9092
- name: KSQL_LISTENERS
value: http://0.0.0.0:8088
- name: KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE
value: "true"
- name: KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE
value: "true"
- name: KSQL_KSQL_INTERNAL_TOPIC_REPLICAS
value: "1"
- name: KSQL_KSQL_STREAMS_REPLICATION_FACTOR
value: "1"
- name: KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR
value: "1"
resources:
requests:
cpu: 250m
memory: 500Mi
limits:
cpu: 500m
memory: 2000Mi

Agora os services, primeiro o principal:

apiVersion: v1
kind: Service
metadata:
name: ksqldb-server
spec:
selector:
app: ksqldb-server
ports:
- protocol: TCP
port: 8088
targetPort: 8088

Agora o service Headless:

apiVersion: v1
kind: Service
metadata:
name: ksqldb-headless
namespace: processing
spec:
clusterIP: None
selector:
app: ksqldb-server
ports:
- protocol: TCP
port: 8088
targetPort: 8088

Nesse caso para implantar é o comando, lembrando que necessário :

k apply -f repository/yamls/processing/ksqldb/

Depois disso vamos ter o nosso KSQLDB pronto para utilização.

Para conectar ele só fazer da seguinte forma:

KSQLDB=Nome do POD do KSQLDB
k exec $KSQLDB -n processing -i -t -- bash ksql

Em seguida iremos acessar o terminal do KSQLDB, alterando a configuração para ler do offset mais recente:

ksql> SET 'auto.offset.reset' = 'earliest';

Agora vamos ver os tópicos que temos através de comandos SQL:

show topics;

Antes de mais nada vamos criar um stream, stream e um objeto do KSQLDB similar a uma tabela, podemos considerar uma abstração do tópico para manipulação usando o KSQLDB, nesse caso vamos usar o campo payload que possui os dados da minha tabela:

CREATE OR REPLACE STREAM ksql_stream_credit_card_json
(
"payload" STRUCT<"id" BIGINT,
"uid" VARCHAR,
"credit_card_number" VARCHAR,
"credit_card_expiry_date" VARCHAR,
"credit_card_type" VARCHAR,
"user_id" BIGINT,
"dt_current_timestamp" VARCHAR,
"messagetopic" VARCHAR,
"messagesource" VARCHAR>
)
WITH (KAFKA_TOPIC='src-sqlserver-credit-card-json', VALUE_FORMAT='JSON');

Importante detalhe, o tipo de dado que estamos trabalhando e um json, que nesse caso tem mais uma camada ou pode se chamar de nível, dessa foram precisamos utilizar STRUCT para poder "trazer"essas campos para o nosso stream.

Segue um link para melhor entendimento do uso do STRUCT no KSQLDB

Agora que já temos o nosso stream inicial, vamos começar a brincar e gerar novos streams, primeiro vamos extrair os campos que queremos trabalhar:

CREATE OR REPLACE STREAM output_ksqldb_stream_credit_card_json
WITH (KAFKA_TOPIC='output-ksqldb-stream-credit-card-json', PARTITIONS=3, VALUE_FORMAT='JSON')
AS
SELECT
AS_VALUE("payload"->"id") as "business_key",
"payload"->"id" as "ID",
"payload"->"uid",
"payload"->"credit_card_number",
"payload"->"credit_card_expiry_date",
"payload"->"credit_card_type",
"payload"->"user_id",
"payload"->"dt_current_timestamp",
"payload"->"messagetopic" ,
"payload"->"messagesource"
FROM ksql_stream_credit_card_json
EMIT CHANGES;

Trabalhando com struct para conseguimos movimentar dentro dos níveis desse json precisamos utilizar o operador "->" conforme exemplo assim, dessa forma criamos um stream com as colunas do nível que precisamos, mas podemos fazer muito mais que isso, por exemplo eu tenho um tópicos de bancos, e quero relacionar com o meu stream de cartão de crédito, vamos ver a estrutura do meu stream de bancos:

CREATE OR REPLACE STREAM output_ksqldb_stream_bank_json
WITH (KAFKA_TOPIC='output-ksqldb-stream-banks-json', PARTITIONS=3, VALUE_FORMAT='JSON')
AS
SELECT
AS_VALUE("payload"->"id") as "business_key",
"payload"->"id" as "ID",
"payload"->"uid",
"payload"->"account_number",
"payload"->"iban",
"payload"->"bank_name",
"payload"->"routing_number",
"payload"->"swift_bic",
"payload"->"user_id",
"payload"->"dt_current_timestamp"
FROM ksql_stream_bank_json
EMIT CHANGES;

Para esse exemplo a coluna id de ambos os streams é a minha chave de relacionamento entre eles, podemos considerar como a chave da minha transação, então vamos lá aqui eu to relacionando esses eventos, agora vamos fazer um JOIN entre os streams:

CREATE OR REPLACE STREAM enriched_ksqldb_stream_bank_credit_card_json
WITH (KAFKA_TOPIC='enriched-ksqldb-stream-bank-credit-card-json', PARTITIONS=9, VALUE_FORMAT='JSON')
AS
SELECT
card."business_key" as "card_business_key",
card."uid" as "card_uid",
MASK_KEEP_LEFT(card."credit_card_number",5) as "credit_card_number",
MASK_RIGHT(card."credit_card_expiry_date",2) as "credit_card_expiry_date",
card."credit_card_type",
card."user_id" as "card_user_id",
bank."account_number",
bank."iban",
bank."bank_name",
bank."routing_number",
bank."swift_bic",
bank."user_id" as "bank_user_id"
FROM output_ksqldb_stream_credit_card_json as card
INNER JOIN output_ksqldb_stream_bank_json as bank WITHIN 1 HOUR
ON card."business_key" = bank."business_key"
EMIT CHANGES;

Não só estamos executando um JOIN mas também criando agora um terceiro stream com o JOIN, assim quando um novo evento surgir o mesmo será gravado no stream enriched_ksqldb_stream_bank_credit_card_json no novo tópico enriched-ksqldb-stream-bank-credit-card-json, dessa forma as aplicações agora poderão consumir esses dados enriquecidos.

Nos exemplos estamos pegando tudo, porém eu poderia colocar um tipo de chave dentro do meu stream passando dessa forma:

SELECT
card."business_key" as "card_business_key",
card."uid" as "card_uid",
MASK_KEEP_LEFT(card."credit_card_number",5) as "credit_card_number",
MASK_RIGHT(card."credit_card_expiry_date",2) as "credit_card_expiry_date",
card."credit_card_type",
card."user_id" as "card_user_id",
bank."account_number",
bank."iban",
bank."bank_name",
bank."routing_number",
bank."swift_bic",
bank."user_id" as "bank_user_id"
FROM output_ksqldb_stream_credit_card_json as card
INNER JOIN output_ksqldb_stream_bank_json as bank WITHIN 1 HOUR
ON card."business_key" = bank."business_key"
PARTITION BY card."business_key"
EMIT CHANGES;

Nesse caso estou particionando pela chave do cartão de crédito mas poderia ser do stream de banco caso seja essa a minha regra de negócio.

Além de streams podemos criar tabelas, isso mesmo tabelas, funciona exatamente como outros SQLs, e aqui vamos criar uma agregação que irá buscar registro dentro de uma janela de 1 hora com uma tolerância de 2 horas após o termino da janela, assim caso eu tenha alguma evento fora de ordem por conta de alguma falha sistêmica eu não perco esses eventos.

CREATE OR REPLACE TABLE enriched_ksqldb_tb_bank_json
WITH (KAFKA_TOPIC='enriched-ksqldb-tb-bank-credit-card-json', PARTITIONS=9, VALUE_FORMAT='JSON')
AS
SELECT
"card_business_key",
count("bank_name") AS "number_of_banks"
FROM enriched_ksqldb_stream_bank_credit_card_json
WINDOW TUMBLING (SIZE 1 HOUR, GRACE PERIOD 2 HOURS)
GROUP BY "card_business_key"
EMIT CHANGES;

Lembrando eu da mesma forma que temos o stream gravando em um tópico , temos também esta nossa tabela gravando em um novo tópico.

Para mais informações sobre KSQLDB sugiro uma leitura na documentação oficial:

No próximo round iremos ver como realizar o output dos dados para outras fontes de dados como DWs.

--

--