Apache Nifi ile Apache Kafkaya Veri Alışverişi ve MongoDb ye Kayıt

Yusuf Gözübüyük
6 min readSep 23, 2022

--

Herkese merhaba ben Yusuf Gözübüyük bu yazımda sizlere Google Cloud Platfomda(GCP) Apache kafka kurup Nifi ile veri alıp vermenin nasıl işlediğini ve Nifi ile veriyi ETL sürecine sokup elde ettiğimiz veriyi de MongoDb ye nasıl kayıt edeceğimizi göstermeye çalışacağım, keyifli okumalar

Öncellikle Nifi’yi nasıl kuracağınızı ve gerekli ayarlamaların nasıl olacağını bilmiyorsanız mediumda ki bu linkten gidip bakabilirsiniz
Yada Nifi ile uygulamalı bir örnek için mediumdaki bu linkten gidip bakabilirsiniz

Bu yazı için bir mongoDb hesabınız olması gerekiyor bu yazıda bu aşamalardan bahsedilmeyecek sadece birkaç önemli adımı anlatacağım hesap oluşturmak için https://www.mongodb.com/ adresinden ücretsiz bir hesap oluşturmanız yeterli ben mongoDb 5.0.12 versiyonuyla oluşturdum(diğer versiyonlarda işlemlerin değişeceğini sanmıyorum)
https://www.youtube.com/watch?v=VKRIz9s9V70 bu videoda anlatılan adımları yapın eğer ip adresinizi herkese açamadıysanız

Sol ortada SECURITY bölümünden Network Access’e tıklayın ve orada gördüğünüz ip adres kısmında edit diyip var olan ip adresini silip 0.0.0.0/0 yapın

Videoda kopyalanan URI de
mongodb+srv://kullanıcıAdı:sifre@cluster0.ymcuu5r.mongodb.net/veritabanıAdı=true&w=majority
şeklinde yapın (kopyaladığınız yerdeki değerleri değiştirmeyin sadece kullanıcı adı, şifre ve veritabanı adını gösterilen yerlere yazmanız yeterli )
Bu bilgileri en son kullanacağız.

Şimdi GCP ye kafka kurulumuna geçelim

Google Consol Platfom hesabına girelim ve arama kısmına
Apache Kafka Server on Ubuntu Server 20.04 Tıklayıp aşağıdaki görseldeki bölüme tıklayalım

LAUNCH a tıklayalım

En aşağıdaki 3 kısmı işaretleyip DEPLOY diyelim

Evet kafka kurulu bilgisayarımız yükleniyor
Şimdi sağ üsteki bölüme tıklayıp Compute Engine diyelim

SSH a tıklayalım, toplamda 3 tane kafka için ssh ekranı açacağız

1.SSH ekranında aşağıdaki kodları sırasıyla yazıp enter diyoruz

cd /opt/kafka/
sudo bin/zookeeper-server-start.sh config/zookeeper.properties

2.SSH ekranında aşağıdaki kodları sırasıyla yazıp enter diyoruz

cd /opt/kafka/
sudo bin/kafka-server-start.sh config/server.properties

3.SSH ekranında aşağıdaki kodları sırasıyla yazıp enter diyoruz

cd /opt/kafka/

Kafka için bir topic oluşturalım.

sudo bin/kafka-topics.sh --create --topic ornek --bootstrap-server localhost:9092
sudo bin/kafka-topics.sh --describe --topic ornek --bootstrap-server localhost:9092

Şimdi Nifi den gelen mesajları görmek için aşağıdaki kodu yazalım

sudo bin/kafka-console-consumer.sh --topic ornek --from-beginning --bootstrap-server localhost:9092

Artık kafkada ki kurulumumuzu tamamladık şimdi GCP de Compute Engine geçelim

Nifi yazılarımı okuduğunuzu varsayarak kurulumları yaptığınızı varsayıyorum

34.122.249.22 yazan yere tıklıyoruz ve açılan sekmede ip ye ek olarak “:8080/nifi “ ibaresini ekleyelim 34.122.249.22:8080/nifi şeklinde

not: bende ip nin önüne https koydu bundan dolayı çalışmadı https’i silmeyi unutmayın

Bu yazıda herhangi bir api ye kayıt olup veri alma zahmetine girmek yerine bir api’nin yapısını kullanacağız.

Haydi Başlayalım

Nifi Ekranında Process Group u ekrana çekip projemize isim verelim ben app yazdım şimdi çift tıklayıp içine girelim

Kafkaya mesajı gönderme

Processor’ü ekrana sürükleyip bırakalım ve arama kısmına GenerateFlowFile yazıp seçelim.
Çift tıklayıp içine girelim Scheduling sekmesinde Run Schedule’ü 3 yapalım yoksa çok hızlı mesaj geleceği için net bir şekilde veri akışını görmemiz olmaz
Properties sekmesine gelip Custom Text e aşağıdaki yazıyı yapıştıralım

{
"success": true,
"result": {
"base": "USD",
"lastupdate": "2019-12-25 11:22 UTC",
"data": [
{
"code": "USD",
"name": "US Dollar",
"rate": 1,
"calculatedstr": "1000.00",
"calculated": 1000
},
{
"code": "EUR",
"name": "Euro",
"rate": 0.9015829342,
"calculatedstr": "901.58",
"calculated": 901.58
},
"..."
]
}
}

Evet aşağıdaki gibi Apply diyelim

Processor’ü ekrana sürükleyip bırakalım ve arama kısmına PublishKafka_2_0 yazıp seçelim.

GenerateFlowFile dan PublishKafka_2_0 ok çekelim(Processor’ün ortasındaki ok) ve success işaretleyelim
Çift tıklayıp içine girelim
Properties sekmesine gelelim

Kafka brokers ta kafka’nın kurulu olduğu makinenin External IP adresini kopyalayalım benim ip adresim “35.239.78.172” ve yanına “:9092” yazalım
35.239.78.172:9092 şeklinde olacak
Topik Name e kafka topiğine verdiğimiz “ornek” yazalım
Aşağıdaki resimlerdeki gibi yapıp Apply diyelim

Kafkadan Veri Alma

Processor’ü ekrana sürükleyip bırakalım ve arama kısmına ConsumeKafka_2_0 yazıp seçelim ve çift tıklayıp içine girelim
kafka brokers a yukarıdaki gibi ip adresini yazalım ve aşağıdaki resimler deki gibi ayarlamayı yapalım

Processor’ü ekrana sürükleyip bırakalım ve arama kısmına JoltTransformJSON yazıp seçelim.
Şimdi ise gelen veride ihtiyacımız olmayan değerleri elememiz gerekiyor,
JoltTransformJSON çift tıklayıp içine girelim
Settings kısmında failure işaretleyelim

Properties kımına gelip Jolt Specification karşısına gelip tıklayalım ve aşağıdaki kodu yapıştıralım ve ok diyelim, daha sonra apply diyip çıkalım

[{
"operation": "shift",
"spec": {
"result": {
"data": {
"*": {
"code": "data-&1-code",
"calculated": "data-&1-calculated"
}
}
}
}
}]

ConsumeKafka_2_0 dan JoltTransformJSON’a ok çekelim success i seçelim

Evet artık verimizi aldık ve düzenledik verinin son hali aşağıdaki gibi oldu

{
"data-0-code" : "USD",
"data-0-calculated" : 1000,
"data-1-code" : "EUR",
"data-1-calculated" : 901.58
}

Şimdi ise bu veriyi mongoDb ye kayıt edeceğiz

Processor’ü ekrana sürükleyip bırakalım ve arama kısmına PutMongo yazıp seçelim.
PutMongo çift tıklayıp içine girelim
Settings kısmında success ve failure işaretleyelim(son adım olduğu için)

Properties kısmında Mongo URI karşısına yukarda aldığımız mongodb+srv ile başlayan bilgiyi girelim

Ben Database Name ve Collection Name lere deneme ismini verdim
Son hali aşağıdaki resimdeki gibi

Yapını son hali aşağıdaki gibi görünüyor artık

Sağdaki Operate bölümünden play tuşuna basalım ve Kafka da ki 3.SSH ekranına dönelim

Evet gördüğünüz gibi nifi veriyi kafka ya gönderdi ve aynı şekilde geri çekti

Şimdi mongoDb de veritabanına bakalım

Harika bir şekilde verimiz kayıt olmuş.

Evet arkadaşlar bu yazımda sizlere GCP ye Kafka kurulumu ,Nifi ile Kafka’ya mesaj gönderme ve almanın yöntemini , Nifi ile ETL sürecini ve elde ettiğimiz veriyi mongoDb ye nasıl kayıt edeceğimizi anlatmaya çalıştım umarım sizler için faydalı bir yazı olmuştur.

Düşünce ve yorumlarınızı merak ediyorum yorumlar kısmında belirtmenizi bekliyorum..

Benimle linkedin den bağlantı kurmayı unutmayın iyi günler

--

--