Implement Kafka Topics Joiner With Golang 🔥

Muhammed Ä°KÄ°NCÄ°
CodeX
Published in
5 min readSep 12, 2022

Hello there! Today, we’ll build a Kafka producer and consumer in Go. Generics, a new Golang feature, will be advantageous to us. Generics are extremely helpful when dealing with numerous kinds in the same context.

Tools

Let’s assess our needs;✨

  • Docker
  • Golang 1.18 or newer

We can set up our folder hierarchy as shown below:

- producer
- producer.go
- main.go
- types.go
- consumer
- main.go
- types.go
- connection.go
main.go
.gitignore
config.yaml
docker-compose.yaml

The system we will set up will be as follows; 🏋️

golang-kafka-producer-and-consumer-with-join-topics

Dockerize

The docker-composefile has to look like this. For this project, we add Kowl, Zookeeper, and Kafka to the docker-composefile.

We now need to update the config.yamlfile with some Kowl configurations;

config-yaml-file-for-kowl

Run Docker by going to your project folder in whichever terminal you like;

docker compose up -d

After several restarts, Kowl will be available to visit the dashboard at localhost:8080once Kafka is ready.

Coding

Ok. We can start coding. First of all, there are two Golang Kafka libraries. segmentio/kafka-go and confluent-kafka-go

You must install the C compiler because Confluent uses CGO. SegmentIO does not utilize CGO; instead, Golang can be used directly. I’ll demonstrate how to use both libraries in this article.

You can find Confluent Producer in the main branch, and I added SegmentIO Producer to another branch;

Producer

Enter the commands below in your terminal after navigating to your producer folder;

cd producer
go mod init producer
get get github.com/segmentio/kafka-go

We will establish links between the Product, Category, and Image. The types.gofile looks like below;

Let’s create our topics and data. We are connecting to Kafka and creating the topics that we need;

Now we can create the producer object and use it in main.gofile. The producer pushes messages to our topics.

When we run the producer;

go run .

The result in Kafka must be like the below;

We’re going to start coding the consumer. We will receive messages from the topics we have created and then we will try to join the messages by associating them with their keys. I also want to show visually what we are doing.

go-kafka-join-tutorial-example-db-structure

We have Product, Image, and Category tables. In our database structure; A product can have more than one photo. However, a product can have only one category.

Consumer

Let’s start creating the consumer!

cd consumer
go mod init consumer
get get github.com/segmentio/kafka-go

In the consumer, we have the same types.go file. You can copy&paste it from the producer.

I am creating a Consumer struct in connection.gofile with generics like below

type Consumer[T comparable] struct {
reader *kafka.Reader
dialer *kafka.Dialer
topic string
}

I put the receiver function named CreateConnection for Consumer. This function connects to Kafka and returns us the reader object.

func (c *Consumer[T]) CreateConnection() {
c.reader = kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{"localhost:9092"},
Topic: c.topic,
Partition: 0,
MinBytes: 10e3, // 10KB
MaxBytes: 10e6, // 10MB
MaxWait: time.Millisecond * 10,
Dialer: c.dialer,
})
c.reader.SetOffset(0)
}

I create another receiver function named Read. This function gets two parameters;

  • model — our tables
  • callback — we pass another table process to here for merge tables
func (c *Consumer[T]) Read(model T, callback func(T, error)) {
for {
ctx, _ := context.WithTimeout(context.Background(), time.Millisecond*80)
message, err := c.reader.ReadMessage(ctx)

if err != nil {
callback(model, err)
return
}
err = json.Unmarshal(message.Value, &model)
if err != nil {
callback(model, err)
continue
}
callback(model, nil)
}
}

We are looking for new messages until the context is over from Kafka. The context will be killed after 80 milliseconds when any message doesn’t receive.

We are opening the main.go file and creating consumer connections.

dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
}
productConsumer := Consumer[Product]{
dialer: dialer,
topic: "producer-product-table-testing",
}
productConsumer.CreateConnection()
imageConsumer := Consumer[Image]{
dialer: dialer,
topic: "producer-image-table-testing",
}
imageConsumer.CreateConnection()
categoryConsumer := Consumer[Category]{
dialer: dialer,
topic: "producer-category-table-testing",
}
categoryConsumer.CreateConnection()

Now we are starting to merge our tables here. I start reading products from Kafka;

productConsumer.Read(Product{}, func(product Product, err error) {
collectedProduct := CollectedProduct{
ID: product.ID,
Name: product.Name,
Price: product.Price,
OriginalPrice: product.OriginalPrice,
Images: []string{},
}

In the first parameter, I say I want to get the product model and the callback parameter, read function sends us the product data and error if it is

In the callback function this time I start to reading for images and also join begins. We are putting images that come from Kafka into the product object

imageConsumer.Read(Image{}, func(image Image, err error) {
if product.ID == image.ProductID {
collectedProduct.Images = append(
collectedProduct.Images, image.URL,
)
}
})

Our main file looks like the below in the end

Result

If we run the consumer, the result will be like this:

{0 product1 23 34  [http://google.com/image1.jpg http://google.com/image2.jpg]}
{2 product2 23 34 Category1 [http://google.com/image4.jpg]}
{3 product3 23 34 Category3 [http://google.com/image5.jpg http://google.com/image6.jpg http://google.com/image6.jpg]}
{4 product4 23 34 Category1 []}
{5 product5 23 34 Category2 []}
{5 product5 23 34 Category2 []}

As you can see we are setting offsets of image and category topics to 0 in every loop. This means every product loop image and category topic starts from the beginning. If you don’t want that, you can create a cache mechanism for old passed messages.

imageConsumer.reader.SetOffset(0)
categoryConsumer.reader.SetOffset(0)

I don’t know this way how efficient. I just focused the creating a topics joiner system in this article. If you want more tools and efficiency the best choice is Kafka with Java

Thanks for reading! đź‘‹

--

--