Apache Kafka
Herkese merhaba, sizlere ilk makalem olarak ele aldığım Apache Kafka’nın temel kavramları hakkında öğrendiklerimi anlatmaya çalışacağım. Apache kafka nedir?, nerelerde kullanılır?, kafka yapısı ve CLI ortamında apache kafka komutlarının kullanımı hakkında sizlere bildiklerimi aktarmaya çalışacağım.
Apache Kafka Nedir?
Apache Kafka, verilerin bir sistemden hızlı bir şekilde toplanıp diğer sistemlere hatasız bir şekilde 10 ms’den az bir gecikme ile neredeyse gerçek zamanlı olarak veri transferini sağlamak için geliştirilen dağıtık, kolay ölçeklenebilir, hataya dayanıklı ve sürdürülebilirliği yüksek bir veri akış mekanizmasıdır. Verilerin disk üzerine immutable log ile yazılması veri kaybı olmamasını sağlamaktadır.
2011’de Linkedin tarafından Scala ve Java ile geliştirilen Kafka daha sonra Apache çatısı altında açık kaynak bir projeye dönüştürülmüştür. Günümüzde Linkedin, Netflix, Uber, Twitter gibi birçok firma tarafından kullanılmaktadır.
Nerelerde Kullanılır?
Mesajlaşma sistemlerinde,
Web sitesi etkinlik takibinde,
İzleme sistemlerindeki metriklerde,
Log toplama sistemlerinde,
Stream Processing işlemlerinde,
Big Data entegrasyonlarında kullanılabilir.
Zookeeper
Kafka, ayağa kalkmak için Zookeeper’a ihtiyaç duyar.
Zookeeper, Kafka Broker’larını yönetir.
Topic partitionları için broker lider seçiminine yardımcı olur.
Yeni bir broker ayağa kalktığında veya düştüğünde, topic oluşturulduğunda veya silindiğinde; Zookeeper, Kafka Broker’lara bildirim gönderir.
Bu nedenle önce Zookeeper, daha sonra Kafka Broker ayağa kaldırılır.
Zookeeper lider seçim stratejisinde Zab, Raft ve Viewstamped Replication dahil olmak üzere çok çeşitli algoritmalar kullanmaktadır. Lider seçiminde karar verme aşamasını daha iyi anlayabilmemiz için bu algoritmalardan raft algoritmasını inceleyelim.
Raft Algoritması
Raft, her düğümün lider, aday ve takipçi olmak üzere üç durumdan herhangi birinde kalabileceğini belirtir. Yalnızca bir lider kullanıcı ile iletişime geçebilir. Bir aday lider olmak için oy isteyebilir. Bir takipçi yalnızca adaylara veya lidere yanıt verir.
Raft algoritması, sunucu durumlarını korumak için zamanı rastgele uzunlukta küçük dönemlere böler. Her dönem, dönem numarası adı verilen monoton artan bir sayı ile tanımlanır. Bu dönem numarası her düğüm tarafından tutulur ve düğümler arasındaki iletişim sırasında iletilir. Adaylar, çoğunluğu elde etmek için diğer sunucu düğümlerinden oy isterler. Çoğunluk sağlanırsa, aday mevcut dönem için lider olur. Aday veya lider, dönem numaraları güncel değilse takipçi durumuna düşer. Dönem numaraları istek sırasında iletilir ve eski bir dönem numarası ile bir istek yapılırsa, söz konusu istek reddedilir.
Raft algoritması, işlevleri gerçekleştirmek için iki tür uzaktan yordam çağrısı (RPC) kullanır:
RequestVotes RPC, bir seçim sırasında oy toplamak için aday düğümleri tarafından gönderilir.
AppendEntries, lider düğüm tarafından günlük girişlerini çoğaltmak için ve ayrıca bir sunucunun hala çalışır durumda olup olmadığını kontrol etmek için bir sinyal mekanizması olarak kullanılır. Kalp atışı yanıtlanırsa, sunucu çalışır, aksi takdirde sunucu kapalıdır.
Lider düğüm, kümenin lideri olarak otoriteyi sürdürmek için follower düğümlerine kalp atışı gönderir. Lider düğüm kalp atışı göndermediği zaman, zaman aşımına uğradığında takipçi düğümlerden biri durumunu aday olarak değiştirir, kendisi için oy verir ve çoğunluğu oluşturmak ve lider olmaya çalışmak için RequestVotes RPC’yi yayınlar. Seçim üç şekilde gerçekleşebillir;
Aday düğüm, diğer düğümlerden oyların çoğunluğunu alarak lider olur. Durumunu lider olarak günceller ve diğer düğümleri bilgilendirmek için kalp atışı göndermeye başlar.
Aday düğüm, oyların çoğunluğunu alamaz ve lider olamaz. Aday düğüm, takipçi durumuna geri döner.
Oyları talep eden aday düğümün dönem numarası kümedeki diğer aday düğümlerden daha azsa, RPC reddedilir ve diğer düğümler aday durumlarını korur. Dönem numarası daha büyükse aday düğümü yeni lider olarak seçilir.
Apache Kafka Yapısı
Topic, Partition Ve Offset
Topic verilerin gönderilip alındığı veri kategorisinin adıdır. Topicleri veritabanındaki tablolarımıza benzetebiliriz. Bir Kafka cluster’ında binlerce topic olabilir.
Topic’ler partitionlara ayrılırlar. Partitionların 0’dan başlayarak artan sayıda giden ID’leri vardır. Bunlara offset denir. Topic’de 1 partition oluşturulabileceği gibi senaryoya göre bin partition da oluşturulabilir. Topic yaratırken verdiğimiz partition sayısını sonradan değiştirebiliriz.
Kafka veriyi belli bir süre (varsayılan 1 hafta) muhafaza eder, daha sonra veri silinir. Ancak eski veri silinse de partitiona yeni veriler gelince offset değeri kaldığı yerden devam eder, tekrar sıfıra dönmez. Veri bir kez bir partitiona yazıldıktan sonra bir daha değiştirilemez.
Brokers
Brokerlar topic ve partitionları tutan sunuculardır. Bir kafka broker grubuna ise kafka cluster denir. Bir brokera bağlandığınızda, buna bootstrap broker denir ve tüm clustera bağlanmış olursunuz.
Her broker içerisinde bir partition lider seçilir. Lider partitionları followerlar(replica) takip ederler ve içerisinde verilerin kopyasını tutarlar. Lider ve followerlara karar veren zookeeperdır. Kopya alınacak follower(replica) sayısını ise kullanıcı belirler. Kopya sayısı performansa olumlu-olumsuz etki edebilir.
Broker ve partition sayısı eşit ise her bir broker bünyesinde bir lider partition barındırır. Partition sayısı broker sayısını geçtiğinde ise lider partitionlar eşit olarak brokerlar arasında dağıtılır.
Herhangi bir sorun durumunda; partition 1’in lider olarak bulunduğu broker 1’in down olma durumunda, broker 2 veya broker 3’de bulunan follewer(replica) durumundaki partition 1’lerden biri zookeper tarafından lider olarak atanır ve bu sayede veri akışı kesintisiz olarak devam eder.
Producer
Producer belli bir topice mesaj gönderen yayımcı pozisyonunda kafka bileşenidir. Veriyi topice key ile ve key’siz olmak üzere iki farklı yolla gönderebilirler.
Key ile gönderme durumunda ilk gönderilen mesaj hangi partitiona gittiyse partition sayısı değiştirilmediği sürece kafka, aynı key ile gönderilen her mesajın aynı partitiona düşeceğini garanti eder. Partition sayısı değiştirilirse bu garanti ortadan kalkar. Key ile veri göndermenin avantajı o veriye daha sonra aynı key ile sıralı bir şekilde erişmektir.
Keysiz gönderme yönteminde kafka iş yükünü dağıtmak için (load balancing) sıralı bir şekilde(round robin) gönderecektir.
Producer veriyi kafkaya gönderirken bir onay mekanizması (acknowledgement) bulundurmaktadır. Bunun için üç opsiyon vardır:
acks=0: producer brokera veriyi gönderir ve brokerdan verinin ulaştığına dair onay beklemez. Çeşitli sebeplerle veri kaybı olursa producerın bundan haberi olmaz ve veriyi tekrar göndermez.
acks=1: varsayılan değerdir. Gönderilen veriler için lider konumdaki brokerdan onay istenir ama follower konumundaki brokerdan onay istenmez. Followera veri kopyalaması bitmeden lider konumundaki brokerdan onay verildiği sırada lider brokerın bulunduğu sunucu devre dışı kalırsa veri kaybı olabilir.
acks=all: Gönderilen veriler için liderden ve followerlardan onay istenir. Bir miktar gecikme karşılığında daha fazla veri güvenliği sunar.
Consumer
Consumer, belli bir topicden mesaj okuyan abone pozisyonunda kafka bileşenidir.
Consumerlar partitionlardan verileri sırayla okur. Kafkaya mesajlar yazılırken birden fazla partitiona yazıldığından dolayı consumerlar aynı sırayla partitionlara erişmediklerinden dolayı kafkaya yazılma sırasından farklı olarak okunabilir. Mesajlar sadece partitionlar içerisinde sıralıdır, partitionlar birbirinden farklı sırada okunabilir.
Consumer Group
Aynı topici okuyan birden fazla consumerın olması durumunda aynı mesajın tekrar tekrar okunmasının önüne geçmek için consumer group kullanılır. Consumer group içerisinde bulunan tüm consumerlar en az bir partitiona atılır ve aynı grup içerisindeki diğer consumer, aynı partitiona atanmaz. Böylelikle tekrarlı okumaların önüne geçilir.
Gruptaki her consumer belli partitionlardan okuma yapar. Mesela topicimizde 3 partition varsa:
3 consumerımız varsa her consumer birer partition tüketmeye başlar.
2 consumerımız varsa biri tek partitionı, diğeri ise kalan iki partitionı tüketmeye başlar.
4 consumerımız varsa biri inaktif olur diğer üçü birer partition tüketir. Gruptaki bir consumerın çökmesi gibi durumlarda fazlalık olan consumer çöken consumerın yerini alır.
Kafka’nın Linux’a Kurulumu
https://kafka.apache.org/downloads
adresinden binary formatında .tgz uzantılı dosyayı bilgisayarımıza indirelim.
İndirdiğimiz dosyaları tar ile açalım;
tar -xzf kafka_2.13–3.1.0.tgz
KAFKA_HOME ve bin dizinlerini PATH’e eklemek için;
nano .bashrc
ile açtığımız dosyanın sonuna
export KAFKA_HOME=/home/irfan/Desktop/kafka_2.13–3.1.0export PATH=$PATH:$KAFKA_HOME/bin
Komutlarını ekledikten sonra kayıt edip çıkıyoruz.
“Kafkanın kurulu olduğu klasörümüzün dosya yolunu “pwd” terminal kodu ile öğrenebiliriz.”
source .bashrc
komutunu çalıştırıyoruz.
echo $KAFKA_HOME
komutu ile dizinimizin PATH’e eklenip eklenmediğini kontrol edebiliriz.
Zookeeper config ayarları için;
mkdir data/zookeepernano config/zookeeper.propertiesdataDir= /home/irfan/Desktop/kafka_2.13–3.1.0/data/zookeeper
Zookeeper’ı başlatmak için;
zookeper-server-start.sh config/zookeeper.properties
Kafka config ayarları için;
mkdir data/kafkanano config/server.propertieslog.dirs=//home/irfan/Desktop/kafka_2.13–3.1.0/data/kafka/kafka-logs
Kafka-Server başlatmak için;
kafka-server-start.sh config/server.properties
Topic oluşturmak için;
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic first_topic --partitions 3 --replication-factor 1
“Eğer bir brokera sahipsek replication-factor değerimizi 1’den büyük veremeyiz. Replication-factor değerimizi 1’den büyük verebilmemiz için broker sayımızı artırmamız gerekmektedir.”
Yeni broker oluşturmak için;
Yeni broker oluşturmak için kafka’nın kurulu olduğu dizinin altında bulunan config dizininin altına gitmemiz gerekmektedir. Bu dizin altında server.properties dosyamızı kopyalayalım.
cp server.properties server-1.properties
-Yeni oluşturduğumuz server-1.properties dosyamızı nano server.properties komutu ile açarak;
broker.id=1
olarak değiştirelim,
listeners=PLAINTEXT://localhost:9093auto.create.topics.enable=false
değerlerini ekleyelim,
log.dirs=/home/irfan/kafka_2.13–3.0.0/data/kafka/kafka-logs1
yeni log dizinimizi gösterelim.
Yukarıdaki adımları izleyerek bir broker daha oluşturalım ve oluşturduğumuz kafka brokerları başlatalım.
kafka-server-start.sh config/server-1.propertieskafka-server-start.sh config/server-2.properties
Artık 3 kafka brokerımız çalışmış oldu ve 1 kafka clustera sahip olmuş olduk. Bu durumda replication-factor değerimizi 3 olarak belirtebiliriz.
Topic ile ilgili detayları görüntülemek için;
kafka-topics.sh --bootstrap-server localhost:9092 --topic first_topic --describe
Partition sayısını artırmak için;
Partition sayısını artırmak istediğimiz durumlarda alter komutunu kullanabiliriz.
Kafka-topics.sh --bootstrap-server localhost:9092 --alter --topic first_topic partitions 5
Mevcut topicleri listelemek için;
Kafka-topics.sh --bootstrap-server localhost:9092 --list
Topic silmek için;
Kafka-topics.sh --bootstrap-server localhost:9092 –topic first_topic --delete
Producer başlatmak için;
kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first_topic
Consumer başlatmak için;
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic first_topic --from-beginning
“-from-beginning ifadesini kullanmadığımız zaman consumer topici abone olduğu andan itibaren tüketmeye başlar. from-beginning ifadesi ile consumera topici en baştan tüketmeye başlayacağını belirtiyoruz.”
Consumer Groups başlatmak için;
kafka-console-consumer.sh --bootstrap-server localhost:9092 --group my-first-application
Consumer Group’ları listelemek için;
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
Kaynaklar: