ELK Stack + Spring Boot

Halit Gürpınar
Finartz
Published in
9 min readJul 16, 2021

Merhabalar, daha önceki Elasticsearch 101 yazımda Elasticsearch’e genel anlamda bir giriş yapmış ve REST API kullanımını görmüştük. Bu yazıda ise Elasticsearch’ün bir spring-boot uygulaması ile birlikte kullanımını göreceğiz.

Elasticsearch’ün spring-boot ile kullanımı için araştırma yaptığımızda var olan örneklerin büyük çoğunluğunda Elasticsearch birincil veritabanı olarak kullanılıp okuma yazma işlemleri doğrudan Elasticsearch üzerinden yapılmaktadır. Bu yazıda ise Elasticsearch’ü birincil veritabanı olarak kullanmadan var olan bir relational database ile nasıl sync tutabiliriz ve nasıl sorgu yapabiliriz bunlar üzerinden ilerleyeceğiz.

Peki Elasticsearch’ü neden birincil veritabanı olarak kullanmıyoruz?

1) Elasticsearch document based bir yapıda olduğu için aslında NoSql bir veritabanı olarak kullanabiliriz. Fakat relational bir database ihtiyacımız varsa Elasticsearch bu noktada bizim işimizi görmeyecektir.

2) NoSql bir yapımız olduğunu varsaydığımızda ise bir başka handikap ortaya çıkmaktadır. Elasticsearh’te transaction mekanizması yok ve haliyle rollback de yapamıyoruz. Bu da yine sistemimizin tasarımına bağlı olmakla birlikte aslında genel olarak ihtiyaç duyacağımız bir şey.

3) Elasticsearch’ün cache refresh mekanizması nedeniyle indexlenen bir veriye 1 sn’lik bir süre boyunca erişemeyeceğiz. Bu da anlık sorgulamalarda veri tutarsızlığına neden olacaktır. Bu konuya da yine önceki yazıda değinmiştik.

Sonuç olarak şu şekilde bir çıkarımda bulunabiliriz: Eğer uygulamamız genel anlamda read odaklıysa ve update işlemleri ara sıra gerçekleşiyorsa Elasticsearch’ü birincil veritabanı olarak kullanabiliriz. Fakat yazma odaklı bir uygulamamız varsa, paralel thread’ler kullanılıyorsa ve transaction management ihtiyacımız varsa bu durumda standart bir veritabanı kullanıp Elasticsearch’ü arama ve analiz odaklı ikincil veritabanı olarak kurgulamamız daha sağlıklı olacaktır.

Bu bilgiler ışığında uygulamamızdan bahsedecek olursak; örnek uygulamamız postgres üzerinde ürün bilgileri tutan bir spring-boot uygulaması olacak. Elasticsearch’ü de ürün arama için kullanacağız. Burada öncelikle karar vermemiz gereken bir nokta var: verilerimizi Elasticsearch’e hangi aşamada yazacağız ve Elasticsearch’ü relational database’imiz ile nasıl sync tutacağız? Bunun için şu seçenekleri değerlendirebiliriz:

1) Postgres’e insert/update yaptığımız esnada Elasticsearch’e de aynı istekleri gönderebiliriz. Fakat bunun bazı dezavantajları vardır:

  • Uygulamamızın response time’ı artacak.
  • Tüm işlemleri duplicate edeceğimiz için code base artacak.
  • Eğer manuel olarak bir şeyleri yönetmeyeceksek Elasticsearch ve postgres arasındaki transaction management spring-boot tarafından yapılmayacak. Yani iki taraftan birinde bir hata olması durumunda diğerinin bundan haberi olmayacaktır. Bu da bizi en baştan beri odağımız olan verileri sync tutma amacının dışına itecektir.

2) Scheduler job kullanmak:

  • Yazacağımız bir scheduler job ile verileri postgres’ten alıp Elasticsearch’e yazabiliriz. Bu sayede normal veritabanı işlemleri esnasında response time’ı etkilemeyiz.
  • Buradaki dezavantaj ise Elasticsearch için implementasyon yapmamız gerekecektir. Bu da code base artışından kaçamayacağımız anlamına gelir.

3) ELK stack’in L ayağı olan Logstash kullanarak verilerimizi sync etmemiz mümkün:

  • Bu yöntemle de scheduler job’da olduğu gibi response time’ı etkilemeyeceğiz.
  • Elasticsearch’e yazma işlemini Logstash configuration’ları üzerinden yapacağımız için code base’i de arttırmayacağız.

Tüm seçeneklerin artı/eksilerini değerlendirip herhangi biri üzerinden ilerleyebilirsiniz. Fakat ben bu yazıda hem tercihen hem de biraz daha Elasticsearch stack’ini kullanma adına Logstash kullanacağım.

Yeri gelmişken ELK stack’e de biraz değinip öyle devam edelim:
ELK üç açık kaynak projesinin kısaltmasıdır: Elasticsearch, Logstash ve Kibana.
Elasticsearch: Verileri tutar, aranabilir ve analiz edilebilir hale getirir
Logstash: Aynı anda birden fazla kaynaktan veri alıp, bunları bir sunucu üzerinde dönüştürür ve Elasticsearch’e gönderir
Kibana: Elasticsearch’teki verileri analiz etme ve görselleştirmeyi sağlar.

ELK Stack’in bir sonraki hali de Elastic Stack’tir. ELK araçlarına ek olarak Beats eklenmiştir.
Beats: Clientlardan topladığı logları logstash’e yollar

Yol haritamızı belirledikten sonra şimdi uygulamamıza geçebiliriz. Öncelikle örnek uygulamamızın kaynak kodlarına github üzerinden erişebilirsiniz. Yazının devamında uygulamanın detaylarından bahsedeceğim.

Uygulamamız postgresql’de ürün bilgilerini tutan basit bir rest api olacak. Logstash ile birlikte verileri postgres’ten okuyup Elasticsearch’e yazacağız. Ardından 3 farklı şekilde ürün arama yapacağız:
1) Spring-data ile postgres üzerinden arama
2) Spring-data ile Elasticsearch üzerinden arama
3) ElasticsearchRestTemplate üzerinden Elasticsearch query’leri ile arama

ELK stack ve postgres’i docker images olarak kullanacağız. Spring-boot uygulamamızı da dockerize edeceğiz. Bunun için gerekli olan Dockerfile ve docker-compose.yml dosyalarımız şu şekilde olacaktır:

Detaylı konfigürasyonlara girmeden image ve port tanımlamalarını yaparak birbirlerine olan dependency durumlarını düzenledik.
Logstash için burada birkaç detay var:

  • volumes kısmında logstash konfigurasyonlarını barındıran logstash.conf ve postgres ile haberleşmeyi sağlamak için ihtiyacımız olan driver jarının bulunduğu path’in tanımlamalarını yapıyoruz.
  • command kısmında da logstash’in logstash.conf ile birlikte çalıştırılması gerektiğini belirtiyoruz

Diğer konfigürasyonlarımız core.env dosyasında bulunmakta:

core.env’da bulunan tanımlamalar sırasıyla:

  • Spring-boot uygulaması için postgres konfigurasyonları
  • postgres db konfigurasyonları
  • Elasticsearch’ü single-node ile basitçe ayağa kaldırabilmek için gerekli konfigurasyonlar
  • Kibana için Elasticsearch erişim URL’i

Logstash için gereken konfigurasyon dosyamız:

logstash.conf yapısı şu şekildedir:
input{} -> veriler Logstash’e nereden ve nasıl gelecek
filter{} -> veriler üzerinde yapacağımız işlemler
output{} -> veriler nereye gönderilecek

logstash.conf’ta bulunan tanımlamalara bakacak olursak:

input: jdbc ile ilgili olan kısımda jdbc input plugin’i için kullanacağımız driver erişim konfigurasyonları bulunmakta.

tracking_column: Bu alan Logstash’in postgres’den okuduğu son kaydı belirleyebilmek için baktığı ve birazdan açıklayacağımız “unix_ts_in_secs” alanını belirtir. Logstash’ın sorgulama döngüsünün bir sonraki yinelemesinde başlangıç ​​değerini belirlemek için kullanılacaktır. İhtiyaç halinde bu alana SELECT cümleciğinde “:sql_last_value” ile erişilebilir.

unix_ts_in_secs: SELECT ile oluşturduğumuz, verimiz üzerindeki “last_modified_date” ı içeren alandır. “tracking_column”, bu alanı referans almaktadır.

sql_last_value: Logstash’in sorgulama döngüsündeki başlangıç ​​noktasını içeren parametredir. Okunan en son “unix_ts_in_secs” değerine ayarlanır ve bir sonraki Logstash döngüsünde başlangıç ​​noktası olarak kullanılır. Bu değişkeni sorguya dahil etmek, daha önce Elasticsearch’e iletilen insert/update’lerin Elasticsearch’e yeniden gönderilmeyeceğini garanti eder.

schedule: Logstash’in değişiklikler için postgres’i ne sıklıkla sorgulayacağına dair vereceğimiz cron expression. “*/10 * * * * *” ile Logstash’a her 10 saniyede bir postgres ile iletişim kurmasını söylüyoruz.

last_modified_date < NOW(): Sorgumuza bu koşulu neden eklediğimizi birazdan açıklayacağım.

filter: Postgres’ten aldığımız kaydın “id” değerini “_id” alanına kopyalıyoruz. Elasticsearch tarafında indexleme yapılırken “id” değerinin “_id” alanında tutulmasını sağlayacağız. Bu sayede id’ler üzerinden verilerimizi sync etmiş olabileceğiz. Sonrasında Elasticsearch’e yazılmasını istemediğimiz için “id”, “@version” ve “unix_ts_in_secs” alanlarını da kaldırıyoruz.

output: Verilerimizin Elasticsearch’e yazılması gerektiğini ve filtre bölümünde oluşturduğumuz id’nin document_id olarak atanması gerektiğini belirtiyoruz. Index name’in de product_index olması gerektiğini belirtiyoruz.

Logstash Select Statement Analizi

Logstash için kullandığımız select statement’ı temelde database’de last_modified_date’i sql_last_value’den büyük olan kayıtları getiriyor

SELECT *, date_part('epoch', last_modified_date) AS unix_ts_in_secs FROM product WHERE (date_part('epoch', last_modified_date) > :sql_last_value AND last_modified_date < NOW()) ORDER BY last_modified_date ASC

Burada neden last_modified_date < NOW() koşulunu eklememiz gerektiği üzerinde biraz duracağız. Bu ifadenin gerekliliğini 2 örnek üzerinden açıklayacağız:

  1. İlk senaryoda last_modified_date < NOW() olmaksızın sql statement’in şu şekilde olduğunu düşünelim:
SELECT *, date_part('epoch', last_modified_date) AS unix_ts_in_secs FROM product WHERE (date_part('epoch', last_modified_date) > :sql_last_value) ORDER BY last_modified_date ASC

Senaryomuzda database’e saniyede(T) 2 kayıt ekleniyor. Logstash de 5 saniyede bir çalışarak kayıtları(R) okuyor. Logstash’in T5 anında çalışarak R1-R11 arası kayıtları okuduğunu düşünelim ve tam da bu esnada database’e R12 eklenmiş olsun.
Logstash en son R11'i okuduğu için ve bunun tarihi de T5 olduğu için sql_last_value T5 olacaktır.

Bir sonraki iteration’da WHERE (date_part(‘epoch’, last_modified_date) > :sql_last_value koşulu nedeniyle Logstash T5'ten sonra eklenenleri okuyacağı için R12 burada es geçilecektir. Bu da R12'nin Elasticsearch’e yazılamayacağı anlamına gelir.

2. İkinci senaryoda, ilk senaryodaki sorunu çözmek için >= kullanıp sorguyu şöyle yapabiliriz (date_part(‘epoch’, last_modified_date) > :sql_last_value :

SELECT *, date_part('epoch', last_modified_date) AS unix_ts_in_secs FROM product WHERE (date_part('epoch', last_modified_date) >= :sql_last_value) ORDER BY last_modified_date ASC

İlk iterasyonda R1–R11 kayıtlar okunacak ve sql_last_value T5 olacak:

İkinci iterasyonda sql_last_value T5 olduğu için T5'ten itibaren tekrar okunacak:

Bu koşul işimizi çözebiliyor fakat bu durumda da ilk iteration’da okuyup Elasticsearch’e yazılan R11, ikinci iteration’da tekrar işlenecek ve Elasticsearch’e tekrar yazılacak.

Sonuç olarak her iki senaryoda da istemediğimiz durumlar var. Birinde data kaybı yaşanırken diğerinde gereksiz işlem yapılıyor.

Çözüm:

(date_part(‘epoch’, last_modified_date) > :sql_last_value koşuluna ek olarak last_modified_date < NOW() ekleyerek her kaydı Elasticsearch’e bir kez gönderebiliriz.

SELECT *, date_part('epoch', last_modified_date) AS unix_ts_in_secs FROM product WHERE (date_part('epoch', last_modified_date) > :sql_last_value AND last_modified_date < NOW()) ORDER BY last_modified_date ASC

Bu senaryoda Logstash T5 anında çalışıyor ve last_modified_date< NOW() koşulu nedeniyle T5 anında eklenen kayıtları okumuyor. R1-R10 arası kayıtları okuyarak Elasticsearch’e yazıyor. Bu durumda sql_last_value T4 oluyor.

T10 anında ikinci iterasyon başlıyor. sql_last_value T4 olduğu için Logstash okumaya T5'ten başlıyor ve T9'u da dahil ederek T10'a kadar okuyor. Bu şekilde bahsettiğimiz riskler ortadan kalkmış oluyor.

ELK ve dockerize ile ilgili yapmamız gerekenler bu şekilde. Şimdi uygulama tarafında neler yapacağımıza değinelim.

Uygulamamız klasik bir spring-boot rest api uygulaması olacağı için model, controller, repository katmanlarına girmeyeceğiz. Bunun yerine ElasticsearchRestTemplate üzerinden search işlemlerini gerçekleştireceğimiz servise ve config sınıfına odaklanacağız.

ElasticsearchRestTemplate’i kullanabilmemiz için bir configuration class’ı oluşturacağız. Elasticsearch’e erişim için şu an güncel versiyon ile kullanılması tavsiye edilen RestHighLevelClient üzerinden haberleşmeyi sağlıyoruz.
Kompleks sorguları yapabilmek için ElasticsearchRestTemplate bean’ı tanımlıyoruz.

Servis class’ımız:

Burada birkaç farklı örnek üzerinden ilerleyeceğiz.
1) ProductDocumentRepository üzerinden search:
Spring Data repository üzerinden, interface’in bize sunduğu metodlar ile search yapabiliriz.
2) ElasticsearchRestTemplate üzerinden search:
Spring Data repository bizim ihtiyaçlarımızı karşılayamayabilir. Bu durumda daha esnek ve kompleks sorgular yazabilmek için ElasticsearchRestTemplate kullanabiliriz.

ElasticsearchRestTemplate ile search yapabilmek için bir Query objesine ihtiyacımız var. Burada Query için 3 farklı implementasyon seçeneği mevcut:

→ StringQuery: Elasticsearch sorgu syntax’ı ile JSON formatında sorgular oluşturmayı sağlar. Bunu kullanabilmek için Elasticsearch sorgu syntax’ının bilinmesi gerekiyor. Öte yandan hali hazırda var olan Elasticsearch query’leriniz varsa StringQuery kullanmak daha uygun olabilir.

→ CriteriaQuery: Elasticsearch sorgu syntax’ını veya temellerini bilmeden sorguların oluşturulmasına izin verir.

→ NativeSearchQuery: Karmaşık sorgularda veya CriteriaQuery kullanılarak ifade edilemeyen bir sorgu olduğunda kullanabiliriz.

Servisimizdeki metodlarda yapılanları kısaca açıklayacak olursak:

  • getProductsByName: Burada doğrudan Spring Data repository üzerinden bir search işlemi gerçekleştiriyoruz
  • getProductsByNameWithStringQuery: StringQuery kullanarak name field’ı verdiğimiz query ile eşleşen kayıtları bulmak için match operatörünü kullanıyoruz.
  • getProductsByNameOrDescriptionWithCriteriaQuery: CriteriaQuery kullanarak name veya description field’ı verdiğimiz query ile eşleşen kayıtları bulmak için is ve or operatörlerini kullanıyoruz.
  • getProductsByPriceWithCriteriaQuery: Yine CriteriaQuery ile birlikte sayısal değerleri büyüklük küçüklüğe göre filtrelemek için greaterThan ve lessThan operatörlerini kullanıyoruz.

Diğer metodlarımızda NativeSearchQuery kullanacağız:

  • getProductsByNameWithNativeSearchQuery: name field’ı verdiğimiz query ile eşleşen kayıtları bulmak için matchQuery operatörünü kullanıyoruz.
  • getProductsByNameOrDescriptionWithFuzzy: verdiğimiz query name veya description field’larının herhangi biri ile eşleşen kayıtları bulmak için multiMatchQuery operatörünü kullanıyoruz. Burada ayrıca yazım yanlışlarını tolere edebilme yeteneği eklemek için de fuzziness operatörünü ekliyoruz.
    fuzziness operatörünün ihtiyaç duyduğu Fuzziness objesi kaç harfe kadar yazım yanlışlarının tolere edileceğini belirtiyor. Bu değeri 0, 1, 2 olarak verebiliriz. AUTO olarak verdiğimizde davranış şu şekilde olacaktır:
    0-2 uzunluğundaki kelimeler için tam eşleşme
    3-5 uzunluğundaki kelimeler için 1 harf tolerans
    >5 uzunluğundaki kelimeler için 2 harf tolerans
    Fuzziness’ın arka planında Levenshtein Edit Distance algoritması yatmaktadır.
  • getProductSuggestions: Arama motorlarında kelimenin ilk birkaç harfini yazdıktan sonra öneri sunma şeklinde sıkça gördüğümüz kelime tamamlama işlemini de wildcardQuery operatörü ile yapabiliriz. Hangi field için search yapılacağını belirterek query’inin neresinden tamamlama yapılması gerektiğini query’imizi (*) ile concat’leyerek veriyoruz.
  • getProductSuggestionsByAllFields: name veya description field’larının herhangi biri üzerinde wildcardQuery çalıştırmak istiyorsak boolQuery ile birlikte “veya” kavramını karşılayan should operatörünü kullanabiliriz.
  • getProductByParameters: verdiğimiz birden fazla query parametresini “ve” ile tam filtreleme yapmak için de yine boolQuery ile birlikte “ve” kavramını karşılayan must operatörünü kullanabiliriz.

Bu yazıda Elasticsearch’ü ikincil veritabanı olarak kurgulayıp var olan birincil veritabanı ile nasıl sync tutabileceğimizden bahsettik. Elasticsearch’ün yetenekleri doğrultusunda birkaç search örneği üzerinde durduk. Eksik veya hatalı gördüğünüz kısımları yorum kısmında belirtirseniz sevinirim :) Kaynak kodlara github üzerinden erişebilirsiniz.

Kaynaklar

--

--