Apache Kafka With Golang: Getting Started

Zayn Korai
The Startup
Published in
4 min readJan 12, 2021

Before this I have worked with Apache Kafka as DevOps Engineer. My experience with Queue systems(kafka/rabbitmq) is mostly in deploying and maintaining monitoring stacks. Where system like Kafka helped to ingest millions of metric points per hour without any reduction in performance.

It got me curious to learn about it that how to decrease high latency of systems and Increase response time even during traffic spikes or high volume data? Also how a DevOps Engineer can utilize it for their custom services

PS: This is just my learning experience here.Where I will only write about simple producer and consumer programs using Go and Apache Kafka.

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

Reason of using solutions like Apache Kafka

Increased Reliability
Queues make your data persistent, If one part of the system is ever unreachable, the other can still continue to interact with the queue.

Better Performance
Message queues enable asynchronous communication, Endpoints that are producing and consuming messages interact with the queue, not each other.

Scalability
When workloads peak, Message queues helps to scale precisely where you need to.

You can read more about it’s reasons here: Message Queues & You — 12 Reasons to Use Message Queuing

Lets Jump into code

It’s s simple system to add a message through REST API then produce it in Apache and process it in consumer/worker. It can be any time-taking process like saving it data store and performing aggregation or some computation.

Running Kafka locally

All of the examples in this post are tested Kafka that is running locally on my machine. Which you can also achieve using Docker and docker-compose.

curl -sSL https://raw.githubusercontent.com/bitnami/bitnami-docker-kafka/master/docker-compose.yml > docker-compose.yml
docker-compose up -d

Writing a Producer along with REST API

I am using Sarama which is Go library for Apache Kafka. There are other choices as well like kafka-go. One reason is sarama is more faster than kakfa-go and another reason go community around sarama and it’s code helped to choose sarama.

Connect to Apache Kafka using sarama as Producer

func ConnectProducer(brokersUrl []string) (sarama.SyncProducer,error) {    config := sarama.NewConfig()    config.Producer.Return.Successes = true    config.Producer.RequiredAcks = sarama.WaitForAll    config.Producer.Retry.Max = 5    // NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.    conn, err := sarama.NewSyncProducer(brokersUrl, config)    if err != nil {        return nil, err
}
return conn, nil}

Push a Comment to queue(topic) using sarama
This function is realy simple it’s takes topic and message as parameters, makes a connection to kafka using the function ConnectProducer above and push that message on given topic.

func PushCommentToQueue(topic string, message []byte) error {    brokersUrl := []string{"kafkahost1:9092", "kafkahost2:9092"}
producer, err := ConnectProducer(brokersUrl)
if err != nil {
return err
}
defer producer.Close() msg := &sarama.ProducerMessage{
Topic: topic,
Value: sarama.StringEncoder(message),
}
partition, offset, err := producer.SendMessage(msg) if err != nil {
return err
}
fmt.Printf("Message is stored in topic(%s)/partition(%d)/offset(%d)\n", topic, partition, offset) return nil}

That’s all we need to connect and push data to kakfa lets write a simple api server which takes json as input and pass to function PushCommentToQueue described above.

I am writing a rest server using fiber, you can use whatever you use.

type Comment struct {
Text string `form:"text" json:"text"`
}
func createComment(c *fiber.Ctx) error {// Instantiate new Message struct
cmt := new(Comment)
if err := c.BodyParser(cmt); err != nil {
c.Status(400).JSON(&fiber.Map{
"success": false,
"message": err,
})
return err
}
// convert body into bytes and send it to kafka
cmtInBytes, err := json.Marshal(cmt)
PushCommentToQueue("comments", cmtInBytes)
// Return Comment in JSON format
err = c.JSON(&fiber.Map{
"success": true,
"message": "Comment pushed successfully",
"comment": cmt,
})
if err != nil {
c.Status(500).JSON(&fiber.Map{
"success": false,
"message": "Error creating product",
})
return err
}
return err
}

func main() {
app := fiber.New()
api := app.Group("/api/v1")
api.Post("/comment", createComment)
app.Listen(":3000")
}

lets push some data to kafka

curl --location --request POST '0.0.0.0:3000/api/v1/comments' \
--header 'Content-Type: application/json' \
--data-raw '{ "text":"nice boy" }'
curl --location --request POST '0.0.0.0:3000/api/v1/comments' \
--header 'Content-Type: application/json' \
--data-raw '{ "text":"keep up the good work" }'

Writing a Consumer/Worker using sarama

Connect to Apache Kafka using sarama as Consumer

func connectConsumer(brokersUrl []string) (sarama.Consumer, error) {    config := sarama.NewConfig()
config.Consumer.Return.Errors = true
// NewConsumer creates a new consumer using the given broker addresses and configuration
conn, err := sarama.NewConsumer(brokersUrl, config)
if err != nilPushCommentToQueue {
return nil, err
}
return conn, nil
}

Then using the connectConsumer we will connect to kafka as consumer. After that we will call ConsumePartition which creates a PartitionConsumer on the given topic/partition with the given offset.
Then we will open a signal chan to read messages. In the signal consumer.Messages method is used which returns the read channel for the messages that are returned by the broker.

topic := "comments"    worker, err := connectConsumer([]string{"kafkahost1:9092", "kafkahost1:9092"})    if err != nil {
panic(err)
}
// calling ConsumePartition. It will open one connection per broker
// and share it for all partitions that live on it.
consumer, err := worker.ConsumePartition(topic, 0,sarama.OffsetOldest)

Output of worker’s consumer message from queue

for complete code please refer to github link

--

--

Zayn Korai
The Startup

Software and Site Reliability Engineer | Technical Consultant