Apache Nifi ile Apache Kafkaya Veri Alışverişi ve MongoDb ye Kayıt
Herkese merhaba ben Yusuf Gözübüyük bu yazımda sizlere Google Cloud Platfomda(GCP) Apache kafka kurup Nifi ile veri alıp vermenin nasıl işlediğini ve Nifi ile veriyi ETL sürecine sokup elde ettiğimiz veriyi de MongoDb ye nasıl kayıt edeceğimizi göstermeye çalışacağım, keyifli okumalar
Öncellikle Nifi’yi nasıl kuracağınızı ve gerekli ayarlamaların nasıl olacağını bilmiyorsanız mediumda ki bu linkten gidip bakabilirsiniz
Yada Nifi ile uygulamalı bir örnek için mediumdaki bu linkten gidip bakabilirsiniz
Bu yazı için bir mongoDb hesabınız olması gerekiyor bu yazıda bu aşamalardan bahsedilmeyecek sadece birkaç önemli adımı anlatacağım hesap oluşturmak için https://www.mongodb.com/ adresinden ücretsiz bir hesap oluşturmanız yeterli ben mongoDb 5.0.12 versiyonuyla oluşturdum(diğer versiyonlarda işlemlerin değişeceğini sanmıyorum)
https://www.youtube.com/watch?v=VKRIz9s9V70 bu videoda anlatılan adımları yapın eğer ip adresinizi herkese açamadıysanız
Sol ortada SECURITY bölümünden Network Access’e tıklayın ve orada gördüğünüz ip adres kısmında edit diyip var olan ip adresini silip 0.0.0.0/0 yapın
Videoda kopyalanan URI de
mongodb+srv://kullanıcıAdı:sifre@cluster0.ymcuu5r.mongodb.net/veritabanıAdı=true&w=majority
şeklinde yapın (kopyaladığınız yerdeki değerleri değiştirmeyin sadece kullanıcı adı, şifre ve veritabanı adını gösterilen yerlere yazmanız yeterli )
Bu bilgileri en son kullanacağız.
Şimdi GCP ye kafka kurulumuna geçelim
Google Consol Platfom hesabına girelim ve arama kısmına
Apache Kafka Server on Ubuntu Server 20.04 Tıklayıp aşağıdaki görseldeki bölüme tıklayalım
LAUNCH a tıklayalım
En aşağıdaki 3 kısmı işaretleyip DEPLOY diyelim
Evet kafka kurulu bilgisayarımız yükleniyor
Şimdi sağ üsteki bölüme tıklayıp Compute Engine diyelim
SSH a tıklayalım, toplamda 3 tane kafka için ssh ekranı açacağız
1.SSH ekranında aşağıdaki kodları sırasıyla yazıp enter diyoruz
cd /opt/kafka/
sudo bin/zookeeper-server-start.sh config/zookeeper.properties
2.SSH ekranında aşağıdaki kodları sırasıyla yazıp enter diyoruz
cd /opt/kafka/
sudo bin/kafka-server-start.sh config/server.properties
3.SSH ekranında aşağıdaki kodları sırasıyla yazıp enter diyoruz
cd /opt/kafka/
Kafka için bir topic oluşturalım.
sudo bin/kafka-topics.sh --create --topic ornek --bootstrap-server localhost:9092
sudo bin/kafka-topics.sh --describe --topic ornek --bootstrap-server localhost:9092
Şimdi Nifi den gelen mesajları görmek için aşağıdaki kodu yazalım
sudo bin/kafka-console-consumer.sh --topic ornek --from-beginning --bootstrap-server localhost:9092
Artık kafkada ki kurulumumuzu tamamladık şimdi GCP de Compute Engine geçelim
Nifi yazılarımı okuduğunuzu varsayarak kurulumları yaptığınızı varsayıyorum
34.122.249.22 yazan yere tıklıyoruz ve açılan sekmede ip ye ek olarak “:8080/nifi “ ibaresini ekleyelim 34.122.249.22:8080/nifi şeklinde
not: bende ip nin önüne https koydu bundan dolayı çalışmadı https’i silmeyi unutmayın
Bu yazıda herhangi bir api ye kayıt olup veri alma zahmetine girmek yerine bir api’nin yapısını kullanacağız.
Haydi Başlayalım
Nifi Ekranında Process Group u ekrana çekip projemize isim verelim ben app yazdım şimdi çift tıklayıp içine girelim
Kafkaya mesajı gönderme
Processor’ü ekrana sürükleyip bırakalım ve arama kısmına GenerateFlowFile yazıp seçelim.
Çift tıklayıp içine girelim Scheduling sekmesinde Run Schedule’ü 3 yapalım yoksa çok hızlı mesaj geleceği için net bir şekilde veri akışını görmemiz olmaz
Properties sekmesine gelip Custom Text e aşağıdaki yazıyı yapıştıralım
{
"success": true,
"result": {
"base": "USD",
"lastupdate": "2019-12-25 11:22 UTC",
"data": [
{
"code": "USD",
"name": "US Dollar",
"rate": 1,
"calculatedstr": "1000.00",
"calculated": 1000
},
{
"code": "EUR",
"name": "Euro",
"rate": 0.9015829342,
"calculatedstr": "901.58",
"calculated": 901.58
},
"..."
]
}
}
Evet aşağıdaki gibi Apply diyelim
Processor’ü ekrana sürükleyip bırakalım ve arama kısmına PublishKafka_2_0 yazıp seçelim.
GenerateFlowFile dan PublishKafka_2_0 ok çekelim(Processor’ün ortasındaki ok) ve success işaretleyelim
Çift tıklayıp içine girelim
Properties sekmesine gelelim
Kafka brokers ta kafka’nın kurulu olduğu makinenin External IP adresini kopyalayalım benim ip adresim “35.239.78.172” ve yanına “:9092” yazalım
35.239.78.172:9092 şeklinde olacak
Topik Name e kafka topiğine verdiğimiz “ornek” yazalım
Aşağıdaki resimlerdeki gibi yapıp Apply diyelim
Kafkadan Veri Alma
Processor’ü ekrana sürükleyip bırakalım ve arama kısmına ConsumeKafka_2_0 yazıp seçelim ve çift tıklayıp içine girelim
kafka brokers a yukarıdaki gibi ip adresini yazalım ve aşağıdaki resimler deki gibi ayarlamayı yapalım
Processor’ü ekrana sürükleyip bırakalım ve arama kısmına JoltTransformJSON yazıp seçelim.
Şimdi ise gelen veride ihtiyacımız olmayan değerleri elememiz gerekiyor,
JoltTransformJSON çift tıklayıp içine girelim
Settings kısmında failure işaretleyelim
Properties kımına gelip Jolt Specification karşısına gelip tıklayalım ve aşağıdaki kodu yapıştıralım ve ok diyelim, daha sonra apply diyip çıkalım
[{
"operation": "shift",
"spec": {
"result": {
"data": {
"*": {
"code": "data-&1-code",
"calculated": "data-&1-calculated"
}
}
}
}
}]
ConsumeKafka_2_0 dan JoltTransformJSON’a ok çekelim success i seçelim
Evet artık verimizi aldık ve düzenledik verinin son hali aşağıdaki gibi oldu
{
"data-0-code" : "USD",
"data-0-calculated" : 1000,
"data-1-code" : "EUR",
"data-1-calculated" : 901.58
}
Şimdi ise bu veriyi mongoDb ye kayıt edeceğiz
Processor’ü ekrana sürükleyip bırakalım ve arama kısmına PutMongo yazıp seçelim.
PutMongo çift tıklayıp içine girelim
Settings kısmında success ve failure işaretleyelim(son adım olduğu için)
Properties kısmında Mongo URI karşısına yukarda aldığımız mongodb+srv ile başlayan bilgiyi girelim
Ben Database Name ve Collection Name lere deneme ismini verdim
Son hali aşağıdaki resimdeki gibi
Yapını son hali aşağıdaki gibi görünüyor artık
Sağdaki Operate bölümünden play tuşuna basalım ve Kafka da ki 3.SSH ekranına dönelim
Evet gördüğünüz gibi nifi veriyi kafka ya gönderdi ve aynı şekilde geri çekti
Şimdi mongoDb de veritabanına bakalım
Harika bir şekilde verimiz kayıt olmuş.
Evet arkadaşlar bu yazımda sizlere GCP ye Kafka kurulumu ,Nifi ile Kafka’ya mesaj gönderme ve almanın yöntemini , Nifi ile ETL sürecini ve elde ettiğimiz veriyi mongoDb ye nasıl kayıt edeceğimizi anlatmaya çalıştım umarım sizler için faydalı bir yazı olmuştur.
Düşünce ve yorumlarınızı merak ediyorum yorumlar kısmında belirtmenizi bekliyorum..
Benimle linkedin den bağlantı kurmayı unutmayın iyi günler