Producers, Consumers e logs usando Apache Kafka em Go

Joao Paulo Duarte Mota
5 min readJun 12, 2024

--

Logo oficial Go

Nesse artigo venho trazer um conceito que é usado em larga escala no mundo da tecnologia quando se trata de streaming de dados, o Apache Kafka.

Antes de começarmos, vale a pena nos conceituarmos sobre o que é e qual é a utilização do Apache Kafka.

Imagine que você é um artista e precisa disponibilizar seu conteúdo para que qualquer pessoa possa consumir na internet, a melhor forma nos dias de hoje é disponibilizar nas plataformas mais utilizadas pelo público no geral, onde normalmente quem é inscrito no seu perfil irá ter acesso assim que o conteúdo for disponibilizado, certo?

O mecanismo do Apache Kafka trata o streaming de dados de maneira similar. Por definição, segundo o site https://redhat.com

“O Apache Kafka é uma plataforma distribuída de transmissão de dados que é capaz de publicar, subscrever, armazenar e processar fluxos de registro em tempo real. Essa plataforma foi desenvolvida para processar fluxos de dados provenientes de diversas fontes e entregá-los a vários clientes.”

De uma maneira bem simplificada, é isso que o Apache faz pra nós. Dessa forma, conseguimos remover toda a complexidade de lidar com tratativas de dados em tempo real, e apenas escutamos os tópicos (essa é a definição dentro do Kafka) que precisamos, e tratamos os dados a partir disso.

Então hoje decidi trazer um exemplo de como podemos tratar os dados, fazendo uma API que recebe um POST, gera um payload simulando um pedido, envia os dados para o Kafka e em seguida salva em uma pasta de logs um json com o pedido gerado.

A primeira coisa que iremos fazer é iniciar um Produtor e instanciar uma API utilizando o pacote Mux:

 router := http.NewServeMux()
router.HandleFunc("/order", handlers.SendOrder)
fmt.Println("Serving on port 8080...")
http.ListenAndServe(":8080", router)

Na primeira linha, estamos instanciando um produtor, que será o pacote responsável por iniciar uma conexao com o Apache Kafka da seguinte forma:

func InitProducer(server, topic string) error {
producer, err = kafka.NewProducer(&kafka.ConfigMap{
"bootstrap.servers": server,
})
if err != nil {
return err
}

fmt.Println("Producer started")
return nil
}

Dentro da função SendOrder, validamos se a request é um POST e passamos para que um servico continue o trabalho:

func SendOrder(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
w.Header().Set("Allow", "POST")
http.Error(
w,
"That method is not allowed.",
http.StatusMethodNotAllowed,
)
return
}
err := services.ProcessOrderRequest()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
}

Logo em seguida, dentro do nosso ProccessOrderRequest montamos um DTO que simula um pedido de order e envia para uma função chamada ProduceMessage:

func ProcessOrderRequest() error {
order := generateOrder()
err := producers.ProduceMessage(order)
if err != nil {
return err
}
return nil
}

Na função ProduceMessage, enviamos os dados para o produtor instanciado:

func ProduceMessage(order dtos.Order) error {
topic := "orders-v1-topic"
value, err := json.Marshal(order)
if err != nil {
return err
}

err = producer.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: 1},
Value: value,
}, nil)

if err != nil {
return err
}

fmt.Println("Message produced:", string(value))
return nil
}

Agora, no nosso arquivo principal, precisaremos escutar esse tópico que produziu a mensagem. Adicionaremos o seguinte código:

func main() {
producers.InitProducer(KafkaServer, KafkaTopic)
go consumers.InitUserConsumer(KafkaServer, KafkaTopic)
go consumers.InitProductConsumer(KafkaServer, KafkaTopic)

router := http.NewServeMux()
router.HandleFunc("/order", handlers.SendOrder)
fmt.Println("Serving on port 8080...")
http.ListenAndServe(":8080", router)

}

Repare que adicionamos dois consumidores, um que seria um usuário e outro que seria um produto (a fim de estudo, instanciamos os consumidores juntamente com o servidor, e para isso precisamos criar uma nova thread dentro da aplicação).

Segue o que foi feito em cada um dos pacotes de consumidor:

func InitUserConsumer(server, topic string) {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": server,
"group.id": "user-service",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
err = c.SubscribeTopics([]string{topic}, nil)
if (err) != nil {
panic(err)
}

fmt.Println("User consumer initialized")

for {
msg, err := c.ReadMessage(-1)
if err == nil {
var order dtos.Order
err := json.Unmarshal(msg.Value, &order)
if err != nil {
fmt.Printf("Error decoding message: %v\n", err)
continue
}

services.CreateRecord("user", order)
fmt.Printf("Received Order from User consumer: %+v\n", order)
} else {
fmt.Printf("Error: %v\n", err)
}
}
}
func InitProductConsumer(server, topic string) {
c, err := kafka.NewConsumer(&kafka.ConfigMap{
"bootstrap.servers": server,
"group.id": "product-service",
"auto.offset.reset": "earliest",
})
if err != nil {
panic(err)
}
err = c.SubscribeTopics([]string{topic}, nil)
if err != nil {
panic(err)
}
fmt.Println("Product consumer initialized")

for {
msg, err := c.ReadMessage(-1)
if err == nil {
var order dtos.Order
err := json.Unmarshal(msg.Value, &order)
if err != nil {
fmt.Printf("Error decoding message: %v\n", err)
}

services.CreateRecord("product", order)
fmt.Printf("Received Order from Product consumer: %+v\n", order)
} else {
fmt.Printf("Error: %v\n", err)
}
}
}

Repare que em ambos consumidores chamamos um servico que “cria registros”, vamos explorá-lo:

func CreateRecord(source string, order dtos.Order) error {
dir := "logs"
if _, err := os.Stat(dir); os.IsNotExist(err) {
os.MkdirAll(dir, os.ModePerm)
}
file := fmt.Sprintf("./%v-%v.json", source, order.ID)
filePath, _ := filepath.Abs("./logs/" + file)
f, err := os.Create(filePath)
if err != nil {
panic(err)
}
defer f.Close()
json, err := json.Marshal(order)
if err != nil {
panic(err)
}
_, err = f.Write([]byte(json))

if err != nil {
panic(err)
}

return nil
}

Aqui, recebemos um source, que seria a origem do consumidor e o pedido gerado. Feito isso, verificamos se existe uma pasta de logs gerada e em seguida geramos json que salva o pedido dentro dessa pasta.

Feito isso, vamos testar:

Ao olharmos a pasta de logs…

Temos o seguinte conteúdo:

Dessa forma, poderíamos ter dois serviços separados que escutavam as mensagens e guardariam em bancos separados, com apenas uma mensagem produzida.

Podemos assim ver o quão poderoso é essa ferramenta e o quanto facilita nosso trabalho no dia a dia, tirando da nossa mão o processamento de dados em tempo real.

Artigos de referência:

https://medium.com/@wahyubagus1910/kafka-producers-and-consumers-with-golang-863d920c723c

https://www.redhat.com/pt-br/topics/integration/what-is-apache-kafka

https://go.dev/blog/go-brand/Go-Logo/PNG/Go-Logo_Black.png

--

--