Bir önceki yazımda kafka mimarisinden bahsetmiştim. Bu yazıda producer ve consumer nedir konularına Spring Boot örnekleriyle değineceğim.
Öncelikle Spring Boot projenizde kafka yı kullanmak için spring kafka dependency sisini eklemeniz gerekiyor. Bu library ile kafka ya mesaj produce etmek ve kafkadan mesaj consume etmek için ihtiyaç duyacağınız özellikleri kullanabilirsiniz.
Şimdi bir de kafka cluster a ihtiyacınız var. Bunun için birden fazla seçeneğiniz var.
1- Apache kafka nın sitesinden işletim sisteminize uygun paketi indirip kurabilirsiniz.
2- Localinizde docker image ayağa kaldırabilirsiniz.
3- Confluent Cloud üzerinden bir cluster oluşturabilirsiniz.
Clusterınızı oluşturdunuz, dependency i projenize eklediniz şimdi producer ve consumer detayına geçebiliriz. Buradaki asıl amacımız producer ve consumer ların mantığını kavramak olduğu için bundan önceki kurulum aşamasının detaylarına değinmeyeceğim. Belki başka bir makalede detaylı olarak kurulum konusunu ele alabiliriz.
Producer
Producer event mesajlarını kafka cluster ındaki topic lere yazan client uygulamalarıdır. Mesajlar topic lere ait partition larda key-value şeklinde saklanır. Bir önceki yazıda bahsettiğim gibi aynı key ile produce edilen mesajlar aynı partition a yazılır. Produce sırasında key gönderimi zorunlu değildir.
Uygulamanızdan mesaj produce etmek için öncelikle producer configlerinizi yazmanız gerekiyor. Producer konfigurasyonu için ihtiyaç duyulabilecek bazı config ler şu şekildedir.
- acks: Acks, producer ın bir mesajı kafka cluster a gönderildi kabul etmesi için broker dan gelmesi gereken minimum acknowledgement sayısını ifade eder. “all”, “0” ve “1” değerlerini alabilir. all -> producer, leader partition ın bütün follower ların mesajı commit ettiğine dair onay almasını bekleyecek. 1 -> leader partition ın kendi commit loguna yazması yeterli. 0 -> herhangi bir ack beklenmiyor.
- max.in.flight.requests.per.connection: Client ın engellemeden önce tek bir bağlantıda göndereceği maksimum onaylanmamış istek sayısı. Varsayılan değer 5 tir.
- linger.ms: Batch record request inin gönderilmeye hazır hale gelmesinden önceki gecikme süresini temsil eder. Request iletimleri arasında gelen tüm record lar, producer tarafından tek bir requestte bir araya getirilir. linger.ms, toplu işlem için gecikmenin üst sınırını belirtir. Varsayılan değer 0 dır. Bu, herhangi bir gecikme olmayacağı ve grupların hemen gönderileceği anlamına gelir (toplu grupta yalnızca 1 mesaj olsa bile). Bazı durumlarda client, verimi artırmak amacıyla orta düzeyde yük altında bile istek sayısını azaltmak için linger.ms yi artırabilir. Ancak bu şekilde hafızada daha fazla kayıt saklanacaktır.
- batch.size: Aynı partition a birden fazla record gönderildiğinde, producer kayıtları bir araya getirmeye çalışır. Bu şekilde hem client ın hem de sunucunun performansı artırılabilir. Batch.size tek bir toplu işin maksimum boyutunu (bayt cinsinden) temsil eder. Küçük batch size, toplu işlemi önemsiz hale getirecek ve verimi azaltacaktır; çok büyük bir batch size ise genellikle ekstra kayıtların beklenmesi amacıyla bir arabellek tahsis edildiğinden bellek israfına yol açacaktır.
Tüm configlere ve detaylarına confluent dökümanından ulaşabilirsiniz.
Producer mesajları kafkaya produce ederken serializer lara ihtiyaç duyar. Çeşitli serializer vardır ama genellikle string, json ve avro serializer kullanılır. Schema registry ve avro kavramlarına bir sonraki yazıda detaylı bir şekilde değineceğim. Github hesabımdaki örnek spring boot producer uygulamasında kullanım şekillerini görebilirsiniz.
Consumer
Consumer event mesajlarını topic lerden okuyan client uygulamalardır. Topic inizi oluşturdunuz, producer uygulamanızı yazdınız data lar kafkaya akıyor, o zaman şimdi sıra geldi consumer uygulamayı yazmaya. Producer’da olduğu gibi consumer için de config ler eklemeniz gerekiyor. Producer lar ile ortak kullanılan lar olduğu gibi consumer özelinde de config ler mevcuttur. Consumer konfigurasyonu için ihtiyaç duyulabilecek bazı config ler şu şekildedir.
- auto.offset.reset: Consumer ın ayağa kalktığı durumda datayı nereden itibaren okuyacağını belirlemek için auto.offset.reset parametresi kullanılır. “earliest” olarak ayarlanırsa topic teki data baştan itibaren tekrar okunur. “latest” olarak ayarlanırsa en sondaki ofsetten itibaren okumaya devam eder. “none” olursa başlangıç offset i manuel olarak belirlenecek anlamına gelir.
- enable.auto.commit: Daha önce offset lerden bahsetmiştik. Consumer bir mesajı okuduktan sonra başka bir consumer ın ya da aynı consumer ın bir sonraki mesajdan devam edebilmesi için offset 1 ilerletilir. Offset in 1 ilerletilmesi için de mesajın commit edilmesi lazım. Eğer bu parametre true set edilirse mesaj okuduktan sonra consumer herhangi bir durumda fail etmezse otomatik commit edilir. Eğer false set edilirse commitSync() metoduyla manuel commit gerekmektedir. Manuel commit ihtiyacına yönelik bir örnek vereyim. Bir datayı kafkadan okuduktan sonra iki işleminiz olduğunu düşünelim. İlki veritabanına kayıt atmak ikincisi kullanıcıya push notfication göndermek. Sizin için burada kritik olan işlem db ye kayıt atılması ve bunun 2. kez yapılmaması. Pn atılsa da olur atılmasa da. Bunun için db ye kayıt yazıldıktan sonra manuel olarak commit yapılır. Böylece buradan sonraki herhangi bir adımda hata alınması durumunda offset zaten ilerletilmiş olduğu için duplicate kayıt oluşmayacaktır. Ama oto commit mekanizması kullanılırsa pn atılırken hata oluşması durumunda offset ilerlemeyeceğinden aynı mesaj tekrar işlenecektir.
- fetch.min.bytes: Tek seferde en az kaç byte veri çekileceğini belirtmek için kullanılan config. Varsayılan değeri 1 dir.
- fetch.min.bytes: Tek seferde max kaç byte veri çekileceğini belirtmek için kullanılan config. Varsayılan değeri 52428800(50MB) dir.
- max.poll.records: Tek seferde kaç record çekileceğini belirtmek için kullanılan config tir. varsayılan değeri 500 dür.
Tüm configlere ve detaylarına confluent dökümanından ulaşabilirsiniz.
Producer ların serializer a ihtiyaç duyduğu gibi consumer lar da deserializer lara ihtiyaç duyarlar. Bir çok deserializer bulunur ama en çok kullanılanları string, json ve avro deserializer dır. Burada dikkat edilmesi gereken bir nokta var. Bir producer ın produce ettiği mesajı consume edebilmek için serialize edilen tipte bir deserializer kullanmak gerekir. Eğer producer StringSerializer ile produce ettiyse siz bu mesajı JsonDeserializer ile consume edemezsiniz. StringDeserializer kullanmanız gerekmektedir. Github hesabımdaki örnek spring boot consumer uygulamasında kullanım şekillerini görebilirsiniz.
https://github.com/emreakin/SpringBoot-Kafka-Consumer
Consumer Group
Consumer lar ile ilgili en önemli kavramlardan biri de consumer group tur. Efficient bir consumer uygulama yazmak için consumer group mantığının tam olarak oturması gerektiği için detaylı bir şekilde değinmek istedim.
Bir topic genel olarak birden fazla partition dan oluşur. Bu partition lar kafka consumer lar için paralelizmin birer üyeleridir. Consumer lar birer partition ı consume ederek bir consumer group un parçası olurlar. Bir topic i consume eden birden fazla consumer group olabilir. Her consumer group un unique bir id si vardır. Bu id kullanıcılar tarafından atanır.
“user” isminde bir topic iniz olduğunu varsayalım ve bu topic in de 5 partition ı var. Consumer1 isminde bir uygulama yazdınız ve user topic ini consume edecek consumer group id sini de “userConsumerGroup” verdiniz. Uygulamayı 3 instance olarak deploy ettiğinizde Kafkanın GroupCoordinator ve ConsumerCoordinator ı user topic inin 3 partition ına Consumer1 uygulamasının 3 instance ını atar. Daha sonra Consumer2 isminde yeni bir uygulama yazdınız ve onun da group id sini aynı şekilde “userConsumerGroup” verdiniz. Bu uygulamayı da 3 instance olarak deploy ettiğiniz an coordinator lar consumer group u rebalance a sokar ve toplamdaki 6 instance ın 5 ini user topic inin 5 partition ına 1 er 1 er atar. 1 instance ise boşta kalır ve herhangi bir consume işlemi yapmaz. Eğer aktif instance larda biri down olursa tekrar bi rebalance gerçekleşir ve boşta bekleyen instance oyuna dahil olup bir partition a atanır. Rebalance nedir, nasıl gerçekleşir, nasıl ayarlar yapılır bunlardan ayrı bir makalede bahsedeceğim. Şunu da söylemeliyim bu verdiğim örnek çok gerçekçi bir örnek değil. Normalde iki farklı uygulama aynı consumer group a dahil olmaz. Aynı topic teki mesajlar farklı bir amaç için işlenecekse farklı bir uygulama ve farklı bir consumer group atanır.
Tek uygulama üzerinden devam edelim. “user” topic inin 3 partition ı var ve Consumer1 uygulamasının 3 instance ı var. Her instance bir partition ı consume edip yoluna devam ediyor. Fakat yükünüz arttı ve topic e ayda 1000 kayıt gelirken günde 1M kayıt gelmeye başladı. Partition larınızda mesajlar birikmeye başladı ve uygulamanız eritmekte zorlanıyor. Burada ilk önce yapılması gereken topic in partition sayısını artırmak ve kafkaya gelen mesajları dağıtmak. Partition sayısını 12 ye çıkardınız ve mesajlar partition lara dağılmaya başladı. Fakat uygulamada 3 instance olduğu için consumer group a ait 3 aktif member bulunuyor ve bunlar da t anında maksimum 3 partition ı consume edebilir. Böylece 9 partitionda mesajlar birikmeye devam ediyor. Burada tercih edilebilecek 2 yöntem var. Birincisi uygulamanın instance sayısını 12 ye çıkarmak. Böylece her instance bir partition ile eşleşecek, t anında 3 mesaj consume edilirken 12 consume edilecek ve hız teorik olarak 4 katına çıkacak. Diğer yöntem ise concurrency i artırmak. Default concurency 1 dir. Eğer her bir instance ın kaynağına güveniyorsanız concurrency i artırıp bir instance ı birden fazla partition ı consume etmesini sağlayabilirsiniz Concurrency i thread olarak düşünebilirsiniz. Bu senaryoda 3 instance ın concurrency isini 4 yaparsanız toplamda 12 partition ı paralel olarak consume edebilirsiniz. Eğer tüm partition ların aynı anda consume edilmesini istiyorsanız maksimumda partition = instance*concurrency hesabını yapabilirsiniz. Ek olarak şunu da belirtmek isterim kafka saniyede milyonlarca datanın akışını sağlayabilir. Ama bunu consume edip işleyebilme yeteneği sizin uygulamanız ve kaynaklarınızla doğru orantılıdır. Yani kafka diyor ki ben sana yüksek sayıdaki data ları aktarabilirim ama sen işleyebilecek kapasiteye sahip misin?
Consumer group lar ile alakalı son bir case den daha bahsetmek istiyorum. Genellikle “testteki uygulamayı localimde ayağa kaldırdım mesajların offset i ilerliyor ama ben consume etmiyorum” şeklinde sorularla karşılaşıyorum. Bunun nedeni şu. Testteki uygulamanız 2 instance çalışıyor ve consume ettiği topic in de 2 partition ı bulunuyor. Siz eğer aynı consumer group id si ile localde uygulamayı ayağa kaldırdığınızda boşta partition olmadığı için siz consume edemiyorsunuz. Test ortamındaki uygulama consume ettiği içinse offset ilerliyor. Çözüm olarak localde uygulamayı ayağa kaldırırken consumer group id sini değiştirirseniz topic-instance dağılımı test ortamından bağımsız olarak farklı bir grup içerisinde yapılacaktır. Siz de consume edebileceksiniz. Ama burada dikkat etmeniz gereken bir nokta var. Eğer consume işleminden sonra db ye kayıt atma, mail gönderme, bakiye güncelleme vb. işlemler yapıyorsanız bunlar duplicate olacaktır. Bu gibi durumlar için localde bir kontrol eklemenizi öneririm.
Bir sonraki yazımda Schema Registry den bahsedeceğim.
Part 2 -> https://medium.com/@cobch7/kafka-mimarisi-e786ce9c9af0
Part 4 -> https://medium.com/@cobch7/kafka-i%C3%A7in-schema-registry-ac299acdc776