.Net Core ile Kafka Üzerinden Mesajlaşma

Faruk Terzioğlu
hepsiburadatech
Published in
8 min readMar 19, 2019

Bu yazıda Kafka üzerinden mesajlaşmak için pratik bir .Net Core uygulamasını anlatacağım. Yazı sonunda .Net Core ile yazılmış ‘producer’ lar ve ‘consumer’ lar Kafka üzerinden haberleşecek. Uygulayacağım senaryoları şu görselle özetleyebilirim.

Yazı içerisinde kullanılan bazı önemli terimleri şu şekilde özetleyebilirim;

Mesaj

Kafka mesaj tabanlıdır, işlem yapılan asıl unsur mesajlardır. Mesajlar basit bir anahtar-değer ikilisinden oluşmaktadır. Hem anahtar hem de mesaj içerikleri serialize edilebilen her şey olabilir. Mesajlara örnek olarak sayfa tıklamaları, yorumlar ve siparişleri verebiliriz.

Topic

Kafka’da mesajlar topic’lere gönderilir. Mantıksal olarak birbiri ile alakalı mesajlar aynı topic içerisine gönderilir. Örneğin tüm siparişler bir topic’e gönderilirken, stok güncellemeleri başka bir topic’e gönderilebilir.

Partition

Kafka, yükün dağıtılması ve hata toleransı için farklı fiziksel/sanal makinelerde birden fazla node olarak çalışmaktadır. Topic’ler de fiziksel olarak bu makineler üzerinde farklı farklı bölümlere (partition) ayrılmaktadır. Her bir partition ayrıca diğer node’lara master/slave olarak kopyalanabilir.

Bir topic’e gelen mesaj farklı dağıtım stratejilerine göre (round-robin gibi) farklı partition’lara gönderilmektedir. Birden fazla partition’a bölünmüş bir topic’de, topic geneli mesaj sırası söz konusu değildir. Mesaj sırası sadece partition içerisinde geçerlidir. Eğer topic geneli bir sıra gerekiyorsa mesaj ile bir de anahtar sunulmalıdır.

https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/

Producer

Topic’lere mesaj gönderen uygulamalardır. Hangi mesajın hangi topic’e gönderileceği producer’lar tarafından belirlenir.

Consumer

Topic’lerden mesajları okuyan uygulamalardır. Hangi topic’lerden mesaj okuyacaklarına, consumer’lar o topiclere ‘subcribe’ olarak belirlerler. Consumer’lar sadece kendilerine atanmış partitionlardaki mesajları okuyabilirler.
Birden fazla partition’ı olan bir topic’deki mesajlar aynı consumer’ın farklı kopyaları dağıtılması için, consumer’lar aynı gruba dahil olmalıdırlar.

https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ch01.html

Offset

Bir topic partition’ı içerisindeki mesajlar ulaşma anına göre sıralı olarak depolanırlar. Partition içerisindeki sıra ‘offset’ değeri ile belirlenir.
Bir consumer’ın hangi mesajda kaldığı da yine bu offset değeri ile belirlenir. Eğer bir consumer hata vb. bir sebepten dolayı tekrar başlaması gerekirse, bu offset değeri ile kaldığı yerden devam edebilir.
Topic içerisindeki bir mesaj herhangi bir consumer tarafından alındığında mesaj kuyruklarının aksine topic içerisinden silinmez. Başka bir consumer daha eski bir offsetten başlayarak aynı mesajları tüketebilir.

https://www.confluent.io/blog/tutorial-getting-started-with-the-new-apache-kafka-0-9-consumer-client/

Uygulanacak senaryolar; bir producer’ın gönderdiği mesajı 3 kopya olarak çalışan bir consumer ve 1 kopya olarak çalışan başka bir consumer tüketecek ve her mesajın iki consumer tipi tarafından da almasını sağlayacağım. Aynı zamanda mesajların iletim sırasının önemli olduğu ve scale olduğu halde ‘mesaj anahtarı’ sayesinde sırasını kaybetmeyen bir consumer da anlatacağım.

Geliştirilen uygulamalar hem Linux de hem de Windows ta çalıştırılabilir. Kod örnekleri de her iki işletim sistemi için de verilecek. Linux komutları ‘$’ ile başlayıp Windows komutları ‘PS>’ ile başlamakta. Eğer her ikisi de yoksa komut iki ortamda da aynı şekilde çalışıyordur.

Kafka’yı Docker Üzerinde Çalıştırma

.Net kodlarına girmeden önce Kafka üzerinde gelen ‘producer’ ve ‘consumer’ ile mesaj alışverişi yapacağız. İlk olarak Kafka ve Zookeeper’ı Docker üzerinde ayağa kaldırmalıyız. Zookeeper’ı Docker üzerinde çalıştırmak için;

docker run -d --name zookeeper -p 2181:2181 jplock/zookeeper

Kafka’yı çalıştırırken Zookeper’ın ip adresini parametre olarak vermemiz gerekiyor. Linux/Windows ta host makinenin IP adresini terminal parametresi olarak kaydedelim;

// Linux 
$ HOSTIP=$(ip -4 addr show docker0 | grep -Po ‘inet \K[\d.]+’)
// Windows
PS > Set-Variable -Name "HOSTIP" -Value "[ MAKINE-IP ]"

Kafka’yı Docker üzerinde çalıştırmak için;

docker run -d --name kafka -p 7203:7203 -p 9092:9092 -e ZOOKEEPER_IP=$HOSTIP ches/kafka

Docker üzerinde çalışan Kafka içine girerek bazı hazır kodları inceleyelim. Daha sonra çalıştıracağımız kodların arka planına dair fikir vermesi açısından faydalı olabilir.

docker exec -it kafka bash
// Container içerisinde;
ls ./bin
Kullanılacak scriptler

Yeni bir topic oluşturmak için ‘kafka-topics.sh’, dahili consumer için ‘kafka-console-consumer.sh’ ve producer için ‘kafka-console-producer.sh‘ scriptlerini kullanacağız.
1 replika ve 1 partition dan oluşan bir topic yaratmak için;

$ docker run --rm ches/kafka kafka-topics.sh --create --topic commands --replication-factor 1 --partitions 1 --zookeeper $HOSTIP:2181PS > docker run --rm ches/kafka kafka-topics.sh --create --topic commands --replication-factor 1 --partitions 1 --zookeeper ($HOSTIP + ':2181')

Yarattığımız ‘commands’ dahil Kafka içindeki tüm topic’leri listelemek için;

$ docker run --rm ches/kafka kafka-topics.sh --list --zookeeper $HOSTIP:2181PS > docker run --rm ches/kafka kafka-topics.sh --list --zookeeper ($HOSTIP + ':2181')
Topic oluşturma ve listeleme

Producer ve Consumer

Şimdi Kafka içerisinde gelen ‘console-producer’ ile oluşturduğumuz ‘command’ topic’ine mesaj gönderelim. Aşağıdaki kod ile producer çalışmış olacak ve yazdığımız her satır, commands topic’ine gönderilecektir. ‘-topic’ parametresi ile hangi topic’e mesaj göndereceğini belirtiyoruz.

docker run --rm --interactive ches/kafka kafka-console-producer.sh --topic commands --broker-list $HOSTIP:9092
Producer’a mesajların gönderilmesi

Producer dan gönderilen mesajları okumak için yine Kafka içerisinde gelen ‘console-consumer’ı çalıştırıyoruz. Aynı şekilde ‘-topic’ parametresi ile hangi topic’den mesaj alacağımızı belirtiyoruz ve ‘ — from-beginning’ parametresi ile de topic içerisindeki mesajları, en başından itibaren istediğimizi belirtiyoruz. Çalıştırdığımızda daha önce gönderdiğimiz 2 mesajın ve producer’dan göndereceğimiz yeni mesajların consumer’a ulaştığını görebiliriz.

docker run --rm ches/kafka kafka-console-consumer.sh --topic commands --from-beginning --bootstrap-server $HOSTIP:9092
Consumer ve producer mesaj alış verişi

İkinci bir terminalde de consumer çalıştırdığımızda aynı şekilde mesajların her iki consumer’a da ulaştığını görebiliriz. Yalnız bir farklılık olarak ikinci consumer’da ‘ — from-beginning’ parametresini çıkarttığım için sadece yeni mesajlar ikinci consumer’a ulaşmıştır.

İki consumer ve ‘— from-beginning’ farkı

Topic Partition ve Consumer Group Kavramları

Sıkça ihtiyaç duyabileceğimiz bir başka senaryo ise mesajların aynı consumer tipinden birden fazla consumer’a dağıtılması.
Mesajların aynı tipten birden fazla consumer’a load-balanced olarak dağıtılabilmesi için kullanılan topic, consumer adedince partition’a bölünmelidir. Yukarıda kullandığımız ‘commands’ topic’i bir partition olarak tanımlamıştık. Onun için ikinci bir 2 partition’lı ‘commands-2’ topic’i oluşturuyoruz.

docker run --rm ches/kafka kafka-topics.sh --create --topic commands-2 --replication-factor 1 --partitions 2 --zookeeper $HOSTIP:2181

Aynı tipteki birden fazla consumer’ın aynı topic’den mesajları sırayla alabilmesi için iki consumer da aynı ‘consumer-group’ a dahil olmalıdır. Onun için daha önce çalıştırdığımız consumer komutlarına ‘ — consumer-property group.id=testApp1’ parametresini ekliyoruz. Bu sayede çalıştıracağımız tüm consumerlar aynı tipte sayılacak ve mesajları sırası ile alacaktır.

docker run --rm ches/kafka kafka-console-consumer.sh --topic commands --from-beginning --bootstrap-server $HOSTIP:9092 --consumer-property group.id=testApp1

Aşağıda görüldüğü gibi 2 consumer çalıştırdığımız halde mesajlar sıralı olarak 2 consumer arasında paylaşılmıştır.

Aynı topic’e farklı bir group.id (testApp2) ile bir consumer bağladığımızda tüm mesajların ulaştığını aşağıda görebiliriz.

Consumer grouplara (testApp1 ve testApp2) ait consumerları listediğimizde, testApp1 için iki farklı CONSUMER-ID verildiğini (her iki topic için), testApp2 için ise tek bir CONSUMER-ID verildiğini görebiliriz. Aşağıdaki görselden okuyabildiğimiz, testApp1 grubundaki consumer-1-afca411c consumer’ı commands-2 topic’inin ikinci partition’ında (1) 3. offsetindedir.

docker run --rm --interactive ches/kafka kafka-consumer-groups.sh --new-consumer --describe --group testApp1 --bootstrap-server $HOSTIP:9092

.Net Core Producer

Kendi yazacağımız producer ve consumer’ı test etmek için 3 partition’lı başka bir topic oluşturalım.

docker run --rm ches/kafka kafka-topics.sh --create --topic commands-3 --replication-factor 1 --partitions 3 --zookeeper $HOSTIP:2181

Producer için yeni bir console uygulaması oluşturalım ve Program.cs i aşağıdaki gibi güncelleyelim. Kafka’ya erişmek için ‘Confluent.Kafka’ Nuget paketini kullanıyoruz. Daha performanslı olduğu ve daha çok özellikli olduğu için beta sürümü kullanılması tavsiye ediliyor. Detaylar için şuraya bakılabilir.
https://github.com/confluentinc/confluent-kafka-dotnet

mkdir KafkaNetCore && cd KafkaNetCore
dotnet new console -n KafkaNetCore.Producer
cd ./KafkaNetCore.Producer/
dotnet add package Confluent.Kafka --version 1.0.0-beta2

Uygulama environment parametrelerinden Kafka url’ini ve bağlanacağı topic adını alıyor. Daha sonrasında kullanıcıdan girdi bekleyerek her bir satırı topic’e gönderiyor. Sonuç olarak gelen ‘DeliveryReport’ üzerinden ‘TopicPartitionOffset’ değeri ile mesajın hangi topic üzerinde hangi partion’a kaçıncı offset’ten girdiğini görebiliriz. Uygulamamızı çalıştırırsak;

$ export TOPIC_NAME=commands-3 KAFKA_URL=$HOSTIP
dotnet run .

Aşağıdaki çıktıda görebileceğimiz üzerine gönderdiğimiz mesaj ‘1', commands-3 topic’inin 2. partition’ına (1) [@1]. offsetten girmiş. Mesaj ‘2’ aynı topic’in 1. partition’ına (0) (@11). offsetten girmiş. Son mesaj ‘5’ ise 3 partion’lı topic’in 3. partion’ına girmiştir.

.Net Core Consumer

Producer’dan gönderdiğimiz mesajları alması için yine .Net Core ile bir console uygulaması geliştireceğiz. Bu uygulama load balanced olarak çalışabilmesi için aynı instance’dan üç adet çalıştıracağız ve mesajların bu üç uygulamaya dağılımını inceleyeceğiz. Uygulamalardan biri veya daha fazlası çalışmadığında nasıl hareket ettiklerini inceleyecek ve mesaj sırasının önemli olduğu senaryoda mesaj key’lerini nasıl kullanabileceğimizi göreceğiz.

Yeni bir console uygulaması oluşturalım ve program.cs’i aşağıdaki gibi değiştirelim. Daha performanslı ve daha çok özellik barındırdığı için ’Confluent.Kafka’ Nuget paketinin tavsiye edilen beta versiyonunu kullanıyoruz.

dotnet new console -n KafkaNetCore.Consumer
cd KafkaNetCore.Consumer/
dotnet add package Confluent.Kafka --version 1.0.0-beta3

Environment parametrelerinden producer’a ek olarak bir de CONSUMER_GROUP parametresini alıyoruz. Bu parametre sayesinde birden fazla consumer aynı grup içerisinde olacak ve mesajları paylaşacaklar. Yeni bir consumer tanımlarken kullanılan ‘ConsumerBuilder’ üzerinden ‘SetRebalanceHandler’ metodu ile, partiton’lar consumer’lar arasında tekrar paylaşıldığında çağrılacak olan event i tanımlıyoruz. ‘consumer.Subscribe’ ile hangi topic’lerden mesaj alacağımızı belirliyor ve her ‘consumer.Consume()’ çağırışımızda yeni bir mesaj alıyoruz. ‘consumer.Commit()’ ile de mesajı alıp işlediğimizi Kafka’ya bildiriyoruz.

export TOPIC_NAME=commands-3 KAFKA_URL=$HOSTIP CONSUMER_GROUP=group1
dotnet run .

Consumer’ı çalıştırdığımızda yukarıdaki gibi bir çıktı alırız. İlk satırdan görüldüğü üzere consumer başlar başlamaz ‘commands-3’ topic’inin 3 partition’ı da çalıştırdığımız consumer’a atandı. Örneğin son iki mesaj, 2. partition’ın 1. ve 2. mesajlarıdır.

Aynı consumer’dan aynı parametreler ile ikinci bir tane çalıştıralım;

Yeni çalıştırdığımız consumer’da aynı consumer grubuna (group1) dahil olduğu için, partition’lar iki consumer arasında paylaşılır. Yukarıda görüldüğü üzere yeni başlattığımız consumer 0 ve 1 partion’larını aldı, eski consumerda ise sadece 2. partition kalmıştır.

Producer’dan gönderdiğimiz sıralı mesajların farklı consumer’lara nasıl ulaştığını aşağıdaki görselde görebiliriz.

Aynı tip consumer’ı farklı grup isimleri ile çalıştırırsak, gönderilen mesajlar her iki consumer tarafından da alınacaktır.

Sıralı Mesajlar

Tek bir topic’e gönderilen mesajlar, aynı consumer group içerisindeki consumer’lara dağıtık olarak gönderildiğinden bahsetmiştik. Eğer mesajlar arasında bir ilişki varsa ve daha önce gönderilen mesaj işlenmeden bir sonraki mesajın işlenmemesi gerekiyor ise mesajlar arasında korelasyonu sağlayacak bir key tanımlamamız gerekiyor. Producer uygulamasını aşağıdaki gibi güncelleyelim. Burada yaptığımız, eğer mesaj içerisinde bir key gönderilmişse, tanımladığımız ‘Message<string, string>’ değişkeninin ‘Key’ değerini güncelliyoruz (satır 9).

Key ile gönderdiğimiz (key1) tüm mesajların aynı partition’a [1] ulaştığını aşağıda görebiliriz (mavi ile işaretlenmiş). Consumer’lar da partition’larla eşleştiği için ‘key1’ ile gönderdiğimiz tüm mesajlar aynı consumer’a ulaşacak ve sıra ile işlenmesini sağlanacaktır.

Yazı içerisinde anlatılan uygulamaların kodlarına ve Docker&Kubernetes üzerinde deploy etmek için gerekli dosyalara projenin Github sayfasında bulabilirsiniz.

--

--