Kafka + (Producer & Consumer) In Local Docker with Go

Wiraizkandar
5 min readFeb 1, 2024

--

Here we will create Kafka producer and consumer with Kafka docker container.

In general idea how Kafka works. Producer will send message for a topic to Kafka and Consumer will pull and consume the message from Kafka according which topic

Producer — A Kafka Producer is responsible for sending records (messages) to Kafka topics. It is a client application or system that generates data and publishes it to a Kafka cluster.

Consumer — A Kafka Consumer is a client application or system that subscribes to and processes records from Kafka topics. It retrieves data from topics and consumes it for further processing.

Here we will create simulation for situation a system produce user social media “like” a social media post action such as ”love”, “like”, “hate”, “smile”, “cry”

We are going to create local Kafka environment using docker . You may install Docker and Docker compose before create Docker container.

Create docker-compose.yml file to configure our Kafka container.

# docker-compose.yml
version: "3.7"
services:
zookeeper:
restart: always
container_name: kafka-like-zookeeper
image: docker.io/bitnami/zookeeper:3.8
ports:
- "2181:2181"
volumes:
- "zookeeper-volume:/bitnami"
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
restart: always
container_name: kafka-like
image: docker.io/bitnami/kafka:3.3
ports:
- "9093:9093"
volumes:
- "kafka-volume:/bitnami"
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
- KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
depends_on:
- zookeeper
volumes:
kafka-volume:
zookeeper-volume:

From our docker-compose.yml, we will using zookeeper to manage kafka broker and we will use port 9093 for our Kafka container. For Kafka container configuration can refer to Docker Hub

Once our docker-compose.yml ready run “docker compose up -d” and “docker ps”

docker compose up -d

You should be able to see your docker containers are ready to be use

Once our Kafka container is ready , we will need to create topic to our Kafka. Go into your Kafka container in your terminal

docker exec -it kafka-like /bin/bash

Create Topic

kafka-topics.sh --create --topic post-likes --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

Show Topics

kafka-topics.sh --list --bootstrap-server localhost:9092

You should be able to see your topic “post-likes” already created then we are ready to push message to “post-likes” topic.

For this simulation we will use Go as our application programming language. We will create a producer.go as our “Producer” and consumer.go as our “Consumer

In this program we will use sarama package . Install sarama in our Go application

go get github.com/IBM/sarama

In this example we will use

Topic : post-likes
Consumer Group ID : consumer-group

Create file producer.go for dummy Producer. In this producer, we will send a JSON message with random action and user id to our Kafka.

package main

import (
"encoding/json"
"log"
"math/rand"
"os"

"github.com/IBM/sarama"
)

type Message struct {
UserId int `json:"user_id"`
PostId string `json:"post_id"`
UserAction string `json:"user_action"`
}

func main() {
brokers := []string{"localhost:9093"}
producer, err := sarama.NewSyncProducer(brokers, nil)
if err != nil {
log.Fatalln("Failed to start Sarama producer:", err)
os.Exit(1)
}

// Dummy Data
userId := [5]int{100001, 100002, 100003, 100004, 100005}
postId := [5]string{"POST00001", "POST00002", "POST00003", "POST00004", "POST00005"}
userAction := [5]string{"love", "like", "hate", "smile", "cry"}

for {
// we are going to take random data from the dummy data
message := Message{
UserId: userId[rand.Intn(len(userId))],
PostId: postId[rand.Intn(len(postId))],
UserAction: userAction[rand.Intn(len(userAction))],
}

jsonMessage, err := json.Marshal(message)

if err != nil {
log.Fatalln("Failed to marshal message:", err)
os.Exit(1)
}

msg := &sarama.ProducerMessage{
Topic: "post-likes",
Value: sarama.StringEncoder(jsonMessage),
}

_, _, err = producer.SendMessage(msg)
if err != nil {
log.Fatalln("Failed to send message:", err)
os.Exit(1)
}
log.Println("Message sent!")
}
}

Create consumer.go as our Consumer. In this consumer we will set auto commit as true. This is important to let our consumer will not consume the same message more than one.

package main

import (
"context"
"log"
"time"

"github.com/IBM/sarama"
)

type exampleConsumerGroupHandler struct{}

func (exampleConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { return nil }
func (exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }
func (h exampleConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for msg := range claim.Messages() {
log.Printf("Received message: %s\n", string(msg.Value))
// Process the message as per your requirement here
sess.MarkMessage(msg, "")
}
return nil
}

func main() {
brokers := []string{"localhost:9093"}
groupID := "consumer-group"

config := sarama.NewConfig()
config.Version = sarama.V2_0_0_0 // specify appropriate Kafka version
config.Consumer.Offsets.AutoCommit.Enable = true
config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second

consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config)
if err != nil {
log.Panicf("Error creating consumer group client: %v", err)
}

ctx := context.Background()

for {
err := consumerGroup.Consume(ctx, []string{"post-likes"}, exampleConsumerGroupHandler{})
if err != nil {
log.Panicf("Error from consumer: %v", err)
}
}
}

Once you both producer.go and consumer.go is ready, run our producer to start produce message to Kafka.

go run producer.go

You should start see our producer start blast dummy message to our Kafka.

The next step is run our consumer.go to start consume message from Kafka.

go run consumer.go

You should see our consumer start consume message from Kafka.

That’s all!. Your are ready to build your own Producer and Consumer for your own use case.

--

--