Setting up a Kafka broker using docker and a consumer group with Golang

Ronan Souza
8 min readAug 18, 2019

After spending a bunch of time trying to set up simple things in Kafka, or just trying to understand core concepts with practical examples, I decided to write this to help everyone who is starting working with Kafka or is just curious about it and still. Here we will create a broker using docker, and a consumer and a producer using golang to demonstrate the Kafka workflow.

Understanding Kafka

Before the hands-on, let’s understand how Kafka works and why it’s so different from other solutions.

Kafka is a publish-subscribe messaging platform that could (normally) have many producers feeding data in it, while consumers are receiving that data. Ok, but there’s a lot of similar applications doing the same thing, why use Kafka?

That’s simple. Kafka can handle trillions of messages a day, and in the big data world, that everything is generating data, a platform that handles data of this magnitude is vital.

Now we will see a few concepts that will be important for the understanding of this material.

Note: If those concepts are not clear to you in the first moment, take it easy. When you see it working in the example, it’s a way easier to understand.

Producer

Is every application that publishes data to a Kafka topic

Consumer

Is every application that subscribes to a Kafka topic to receive data from this.

Broker

A Kafka broker is a node in a cluster that hosts partitions.

Topic

A topic is a category or feed name to which records are published. Topics in Kafka are always multi-subscriber; that is, a topic can have zero, one, or many consumer groups that subscribe to the data written to it.

Partitions

Kafka topics are divided into partitions. Each message in a partition has an offset that is used to order them into the partition. Partitions are really important because this enables to distribute a topic in different servers and creates replicas.

topic with 3 partitions

So when a partition receives a new message, it will create a new offset to index it. On the other side, when a consumer confirms that received a message from a given partition, the Kafka broker updates the last offset consumed in this partition by this consumer, thereat Kafka knows the last message consumed in each partition for a given consumer group avoiding duplicity.

Replicas

Replicas are partitions copies for fault tolerance, it means that if one broker is down and will have multiple replicas, no data will be lost, because Kafka will use another replica for each partition, that has the same data of the first one.

ConsumerGroup

Each consumer has a consumer group ID, that means that each message from a topic will be delivered to only one consumer in a certain consumer group. To assure this, Kafka only allows that one consumer for each group can consume from a certain partition. This means that if a consumer group has more consumers than the topic has partitions, will have consumers idle in this consumer group.

consumer group representation

Setting up your Kafka environment.

Before we set up our environment, there are few things that are important to know. First thing is that Kafka needs another service to work, the Apache Zookeeper. Ok, but what is a Zookeeper? According to its official page “ZooKeeper is a centralized service for maintaining configuration information, naming, providing distributed synchronization, and providing group services.” Long story short: Kafka needs a Zookeeper server and we’re gonna set it up :D

There are many ways to get Kafka running, such as Confluent Platform, Apache code running locally or even Amazon MSK service. But the simplest way to have a Kafka running, in my opinion, is Docker.

For this, we’ll be using a docker-compose file which uses docker images from Wurstmeister repository. To start this, you need to have docker and docker-compose installed on your machine.

Note: If you don't have docker and docker-compose installed, please read this to install docker this to install docker-compose

docker-compose.yml

This docker-compose.yml file will create two containers, one for Zookeeper that will only be used by Kafka and another for Kafka, that will be available in kafka:9092 if your consumer/producer is also in a docker container, or localhost:9093if your application is running locally. In the Kafka container, a topic called ‘sarama_topic’ was created with 2 partitions and 1 replica by an environment variable. This topic was created to be used in further examples.

To start the containers, go to the docker-compose directory and run

docker-compose up -d

After this, if you execute docker ps you will see two containers running, one called zookeeper and another called Kafka. Now you have your Kafka environment up and running, ready to connect with producers and consumers.

Let’s Code!

To write our producer and consumer in golang, we’ll use a client library developed by Shopify called Sarama. To use sarama, we need to have it installed, so to do that you need to run:

go get "github.com/Shopify/sarama"

Producing messages

Once you have Sarama installed, we can start building our producer. Samara provides AsyncProducer and SyncProducer for producing messages. The AsyncProducer accepts messages on a channel and produces them asynchronously in the background as efficiently as possible, while SyncProducer is not so efficient as AsyncProducer because of its block while Kafka acknowledges the message as produced. For our application, it’s indifferent, so we’ll use an AsyncProducer. Our final code will look like this:

sarama async producer

This producer will send a message to ‘sarama_topic’ topic with the value of ‘testing 123’ every second indefinitely.

Note: In this example the partition wasn’t chosen, it’s been automatically decided by the producer partitioner.

Consuming messages

Now we have a producer sending messages each second to a topic, it’s time to get those messages back.

Kafka has a concept of consumer groups, that Kafka assures that only one consumer for each consumer group will receive a certain message. Ok, but what difference does it make?

When you have an environment where there is coming more messages than a single consumer can handle, it’s necessary to scale it. Knowing that Kafka will assure that each message of a specific topic will be delivered to only one consumer for each consumer group, it’s possible to split the job between many consumers in the same consumer group, all of them doing the same job.

Note: It’s important to remember that each partition in a topic can only be consumed by a single consumer for each consumer group. So if you have 2 partitions in a topic and a consumer group with 3 consumers, one consumer will be idle all the time.

Before creating the consumer group, we need to define a consumer group handler. This handler will have the code of what each consumer will do once it is started.

consumer group handler

When a new consumer group handler is created it necessarily needs to have the following methods: Setup, Cleanup, and ConsumeClaim, as they are described in the example.

  • Setup is a called just before the consumer starts consuming messages, it's normally used to set up things like database connections, etc;
  • Cleanup is called when the consumer stop consuming, it’s normally used to close connections, notify something that the consumer was stopped, etc;
  • ConsumeClaim is the consuming loop, so what needs to be done with each message received need to be inside this loop. In this example, we are just logging each message received with partition, offset and the message itself.

Once the consumer group handler is defined, it’s time to start the consuming. The following code shows us how to set up a Kafka consumer group and really start consuming using a handler.

Testing the application

To test our application it's necessary to call StartProducer and StartConsumer. For this, I created a GitHub repository that has a main file to help us out.

To start producing messages you make sure you have a Kafka broker available. Then go to the root of the repository and run

go run main.go producer

once you call it, you see some log with [sarama_logger] prefix while the producer is being set up and after you see the following log that means your consumer is running.

2019/08/18 11:30:59 New Message produced
2019/08/18 11:31:00 New Message produced
2019/08/18 11:31:01 New Message produced
2019/08/18 11:31:02 New Message produced
2019/08/18 11:31:03 New Message produced
2019/08/18 11:31:04 New Message produced
2019/08/18 11:31:05 New Message produced

Now, it’s necessary to start a consumer and see if all those messages are being received. To do this run the following command in a different terminal:

go run main.go consumer

Again there will be some log from sarama when the consumer is being set up and it will start to log all messages being consumed like this:

Message topic:"sarama_topic" partition:1 offset:80 message: testing 123
Message topic:"sarama_topic" partition:1 offset:81 message: testing 123
Message topic:"sarama_topic" partition:1 offset:82 message: testing 123
Message topic:"sarama_topic" partition:1 offset:83 message: testing 123
Message topic:"sarama_topic" partition:0 offset:382 message: testing 123
Message topic:"sarama_topic" partition:0 offset:383 message: testing 123
Message topic:"sarama_topic" partition:0 offset:384 message: testing 123

Note that because our topic has two partitions, the producer is sending messages for both of them, and our consumer group (that has only one consumer) is consuming from both partitions.

To start another consumer in the same consumer group we just need to open a new terminal (is important to keep the first consumer running) and run go run main.go consumer. Note that when the Kafka identifies that there’s a new consumer in this group, a coordinator is called to balance the load between all consumers in this consumer group. It’s possible to see that your first consumer will log something like the following block:

[sarama_logger]2019/08/18 11:43:49 consumer/broker/1001 closed dead subscription to sarama_topic/1
[sarama_logger]2019/08/18 11:43:49 consumer/broker/1001 closed dead subscription to sarama_topic/0
[sarama_logger]2019/08/18 11:43:49 client/metadata fetching metadata for [sarama_topic] from broker localhost:9093
[sarama_logger]2019/08/18 11:43:49 client/coordinator requesting coordinator for consumergroup sarama_consumer from localhost:9093
[sarama_logger]2019/08/18 11:43:49 client/coordinator coordinator for consumergroup sarama_consumer is #1001 (localhost:9093)
[sarama_logger]2019/08/18 11:43:49 consumer/broker/1001 added subscription to sarama_topic/1

After that, one consumer will receive all messages from partition 0:

Message topic:"sarama_topic" partition:0 offset:402 message: testing 123
Message topic:"sarama_topic" partition:0 offset:403 message: testing 123
Message topic:"sarama_topic" partition:0 offset:404 message: testing 123
Message topic:"sarama_topic" partition:0 offset:405 message: testing 123
Message topic:"sarama_topic" partition:0 offset:406 message: testing 123

While another will consume all messages from partition 1:

Message topic:"sarama_topic" partition:1 offset:91 message: testing 123
Message topic:"sarama_topic" partition:1 offset:92 message: testing 123
Message topic:"sarama_topic" partition:1 offset:93 message: testing 123
Message topic:"sarama_topic" partition:1 offset:94 message: testing 123
Message topic:"sarama_topic" partition:1 offset:95 message: testing 123

Another test we can do is start another consumer meanwhile the first two still running. And as expected, the two first still consuming each one from a partition while the third one is idle, because our topic has only two partitions.

Conclusion

In this tutorial, we applied some important Kafka concepts developing a complete architecture producer and consumers. Though it could be really helpful in the first moment, if you’re gonna use something in production with Kafka, you’ll have to go deeper looking things like error handling or configs like consumer timeout that will be specific for your application.

Special thanks for Marcos Fábio Pereira, without his help I wouldn't have written this.

If you have any doubts about this or suggestions you can find me on twitter or comments bellow.

--

--