Kafka ❤ SQL = ksqlDB

Büşra
8 min readMay 25, 2022

--

Selamlar, bu yazımda bir kafka topic’indeki eventleri ksqlDB ile herhangi streaming uygulamasına ihtiyaç duymadan nasıl sorgulayıp işleyebileceğimizden, sağladığı avantajlardan ve beraberinde getirdiği dezavantajlardan bahsedeceğim. Daha önce hiç Kafka ve Kafka streams deneyimlemediyseniz temel kavramlar açısından bakmak faydalı olacaktır.

ksqlDB’yi hangi durumda kullandık, kullanacağız;

  • Trendyol’da tedarikçi ürünlerinin içerik kontrolünden sorumlu olan Quokka projesinde, ürünlerin trendyol kataloğunda yer alması için gereken tüm validasyonları, analizleri adım adım ürün statusleri ile yapar. Sistemsel sorunlardan, veri yapısından dolayı aynı süreçleri tekrarlayan ürünleri yakalamak adına “Loop Detection”, bir nevi monitoring amaçlı kullandık.
  • Şu anki International Product projemizde aynı topic’e basılan yüzlerce event arasından proje domainine ait eventleri filtrelemek için kullanacağız. Bu sayede consumer API tarafında eventin serialize maliyetini ortadan kaldıracağız.

ksqlDB nedir onu bir açıklayalım;

ksqlDB, kafka ekosisteminde akış işleme (stream processing) uygulamaları oluşturmaya yönelik belli soyutlamalar ile, Table ve Stream olarak adlandırdığımız, kafka topicleri üzerindeki eventleri SQL syntax’ı ile işlemeye, filtrelemeye, birleştirip veriyi zenginleştirmeye olanak sağlayan özelleştirilmiş bir veri tabanıdır.

ksqlDB, dağıtık, ölçeklenebilir gerçek zamanlı bir stream processing ortamı sunar.

ksqlDB, REST API ve web UI sayesinde kullanımı rahat bir erişim sunar. Bunun yanı sıra CLI ile de ksqlDB’ye erişip istenilen sorguları çalıştırma olanağı sağlar.

ksqlDB 4 ana bileşenden oluşur:

  • Engine ile SQL ifadelerini ve sorgularını işler.
  • REST API ile kullanıcılara erişim olanağı sağlar.
  • CLI ile yine erişim olanağı sağlar.
  • Confluent Control Center üzerinde Web UI ile ksqlDB uygulamaları geliştirilmesine olanak verir.

Streams ve Tables

KsqlDB’de birbiriyle ilişkili iki kavram vardır: Streams ve Tables.

Streams; Akan verilerden oluşur, tekrarlayabilen verilerdir. Aynı key’e ait farklı değerler görülebilir.

Tables; Akan verilerden elde edilen(Streams) en sonki durumun değeridir. Aynı key’e ait son değer table üzerinde tutulur.

Streams vs Tables

Materialized Views

Materialized view, veritabanlarında uzun zamandır var olan bir kavramdır ve bir sorgunun sonuçlarını depolamak için kullanılır. (materyalizasyon). Bir sorgunun önceden hesaplanmış sonuçları daha sonra sorgulama için kullanılabilir hale getirilir.

Materialized view’in yararı sorguyu tüm tablo üzerinde yeniden çalıştırmak yerine, yalnızca değişiklikler (delta) üzerinde bir sorguyu çalıştırmasıdır. Yeni bir event’in gelmesi ile mevcut durum farklı bir duruma dönüşür. Oluşan bu değişim farkı yeni bir Materialized view yaratır. Bu sayede yeni oluşan bu view tamamen işlenerek oluşmaz.

ksqlDB’de bir Table materialized view olabilir. Eğer direkt olarak bir kafka topic’i üzeriden yaratılmış ise bu Table materialized view değildir. Materialize edilmemiş tablolar yüksek performans kaybına neden olacağı için sorgulanamaz. Farklı bir koleksiyondan yaratılan tabloların sonuçlarını Materialized View oldukları zaman sorgulanabilirler.

Pull Query

Arama tarzında olan sorgulardır. Sürekli olarak çalışmazlar, anlık olarak istenilen sorgu sonucunu kullanıcıya döner. Materialized view gerçek zamanlı güncellendiği için veri bütünlüğünde uyuşmazlıklar görülebilir.

Örneğin bir araca ait sadece o anki konum bilgilerini pull query ile aşağıdaki gibi sorgularız.

SELECT vehicleId, latitude, longitude FROM currentCarLocations WHERE vehicleId = '6fd0fcdb';

Push Query

Pull Query aksine, çalıştırılan sorgu her durum değişikliği ile yeni çıktılar üretir ve bu çıktılar push query ile kalıcı olarak, terminate edilene kadar, arka planda çalışır.

Örneğin bir araca ait konum değişimleri sürekli olarak izlemek istersek push query ile aşağıdaki gibi sorgularız.

SELECT vehicleId, latitude, longitude FROM currentCarLocations WHERE vehicleId = '6fd0fcdb' EMIT CHANGES;

Windowing

ksqlDB’de zamansal sorgular yazıp, verileri işlemek geleneksel veri tabanlarına göre oldukça kolaydır.

Stream ve Table üzerinde SUM, AVG, COUNT, MAX, MIN gibi aggregation fonksiyonlarını window olarak adlandırılan belli zaman kısıtlamaları ile sorgulama olanağı sunar.

Her window’un başlangıç ve bitiş zamanı bulunur. Örneğin; 10 dk’lık zaman dilimlerinde aynı kullanıcıya ait kaç tane başarısız giriş işlemi yapıldığını window kullanarak bulabiliriz.

Window kullanımında GROUP BY ile eventleri gruplamamız gerekmektedir. Gruplanan alan ile, örneğin user_id, o window içersinde bir key değeri üretmiş olur ve o key’e ait sorgularımızı çalıştırır.

Window Türleri

Tumbling Window

Sabit bir zaman değeri ile ardı ardına pencereler açar. Bir window biterken diğer window başlar, bu sayede windowlar üst üste gelmez ve her bir kayıt sadece o an başlayan ve devam eden window’a ait olur.

Bir banka hesabına 5 saniye içersinde, 3'ten fazla hatalı kullanıcı giriş isteği örnek olarak verilebilir.

SELECT user_id, count(*) FROM authorization_attempts
WINDOW TUMBLING (SIZE 5 SECONDS)
GROUP BY user_id HAVING COUNT(*) >= 3
EMIT CHANGES;
tumbling window ve event dağılımı

Hopping Window

Sabit zamanlı bir window türüdür. Hopping window kullanırken 2 tane değeri tanımlamalıyız.

  • window_size: Açılan window’un zamansal boyutunu belirler. 5 saniyelik, 3 dakikalık gibi.
  • advance_by: Bir sonraki window’un sekme zamanını belirler. Örneğin; 3 sn’de bir yeni bir window başlatır. Başlatılan window’un boyutu işe window size ile belirtilir.

Bu iki değeri eşit olarak tanımlarsak tumbling window elde etmiş oluruz.

Hopping window

Sekme(hop) boyutu, window boyutundan küçük olduğu durumlarda aynı eventler birden çok window içersinde görünmesine sebep olur (Overlapping) .

Session Window

Diğer window türlerinden farklı olarak session window dinamik boyutlu bir türdür. Session window tanımlaması yaparken inactivity değeri tanımlanır. Aynı key’e sahip son event’in yayımlama tarihinden itibaren verilen inactivity değeri kadar bir zaman damgası oluşturulur. Eğer bu süre inactivity süresinden uzun ise yeni bir window yaratılır, küçük ve eşit ise önceki window ile birleştirilir.

Session window

Örneğin, session window sorgusu ile, kullanıcıların 60 saniyelik inactivity değeri ile bir web sayfasının görüntülenme sayısını kullanıcı bazlı gruplayarak session window sorgusu ile elde edebiliriz.

SELECT userId, COUNT(*) FROM pageviews
WINDOW SESSION (60 SECONDS)
GROUP BY userId EMIT CHANGES;

Şimdi örnek bir durum ile sorgu yapısını inceleyelim

Uygulamamızda ürün ve o ürünün hangi aşamada olduğunu gösteren bit status alanı ile belirli bir zaman çerçevesinde analiz edip herhangi bir döngü anomalisi olduğuda başka bir kafka topic’ine bu kurala uygun ürünleri belirttiğimiz yeni contrat ile event bırakan bir ksqlDB senaryosu oluşturalım.

Kurulum aşamaları için burdan bakabilirsiniz.

“productStatusChanged” isimli bir topic’imiz olsun. Ürün statusu her değişiminde bu kafka topic’iğimize event olarak bırakılsın. Contract’ımızı ise aşağıdaki gibi olsun.

{
"id" : Long,
"status": String
}

Status alanı için de birden fazla değer tanımlayalım;

VALIDATION_CONTROL, MEDIA_DOWNLOAD, MEDIA_ANALYSIS, APPROVED, REJECTED

Bunlar gibi daha fazla adım, statü eklenebilir. Örneklendirmek ve case’i anlamak için şimdilik yeterli.

Stream Yaratma

Bir stream oluşturarak var olan eventleri artık kendi stream’imize akıtıp işlemeye başlayabiliriz. Burda stream yaratıldıktan sonra event akışı olunca stream çalışacaktır eski eventleri de işlemek için ksqlDB CLI’ından aşağıdaki komutu çalıştırarak topic’i baştan okumasını sağlarız. Bu config production için çok doğru olmayabilir.

ksql> SET 'auto.offset.reset' = 'earliest';

CREATE STREAM PRODUCT_STATUS_STREAM
(id BIGINT, status VARCHAR)
WITH (
kafka_topic='productStatusChanged',
value_format='json'
);

Burda kafka_topic tanımı ile productStatusChanged topic’ine erişim sağlamış oluyoruz. Topic’e bırakılan eventlerin yapısı JSON olduğu için de value_format tanımını json olarak veriyoruz. ksqlDB’nin desteklediği AVRO, DELIMETED gibi farklı formatlar da bulunuyor.

Şimdi bu Stream üzerinden kendi senaryomuzu yapmaya çalışalım;

Amacımız her bir ürünün aynı status değerine bir zaman diliminde kaç defa geçiş yaptığını bulmak. Zaman dilimi sorgularımız içi tumbling window yapısını kullanmamız gerekecektir. Çünkü belirli bir zaman sınırı koymadan tekrar sayısının hesaplanması uzun vadede doğru bir sonuç vermeyecektir.

Basit haliyle akışımız bu şekilde olacak

Sorguyu yazmadan önce akışımızı çıkaralım;

  • Stream üzerinde ürün id ve status alanları ile bir GROUP BY sorgusu yazarak kaç defa tekrarladığını bulalım.
  • Tekrar sayısını bulmak için zaman kısıtı olarak 5 dk’lık window açalım.
  • Her 5 dk’lık windowlar ile gelen eventleri id ve status alanları ile gruplarsak çıkan tekrar sayısına ise 3 sınırı ekleyelim. Bu koşul ile 5 dk’lık zaman dilimleri içinde aynı ürün id ve status değerine sahip 3 ve daha fazla event tekrar ederse bir anomali olduğunu belirtir.
  • Stream üzerindeki değerleri işlerken çıktısını bir Table oluşturarak elde edelim.
  • Bu sorgu ile de elde ettiğimiz sonuçları farklı bir kafka topic’ine yönlendirelim.
ksqlDB üzerindeki mimarisi
CREATE TABLE PRODUCT_STATUS_LOOP_DETECTION_TABLE WITH (
KAFKA_TOPIC = 'productStatusLoopDetected',
VALUE_FORMAT = 'json',
PARTITIONS = 2,
REPLICAS = 1
) AS
SELECT id AS `id`,
status AS `status`,
COUNT(*) AS `repetitionCount`
FROM PRODUCT_STATUS_STREAM
WINDOW TUMBLING (SIZE 5 MINUTE, RETENTION 1 DAYS)
GROUP BY id, status
HAVING COUNT(*) >= 3
EMIT CHANGES;
  • productStatusLoopDetected topic’i eğer önceden yaratılmadı ise ve ksqlDB ayarlarıda otomatik topic yaratma özelliği açık ise verilen PARTITIONS ve REPLICAS değerleri ile topic’i oluşturur. Daha önce yaratılan topic’i kullanmak istersek PARTITIONS ve REPLICAS değerlerini varsayılan topic ayarlarıyla aynı olmalıdır. Aksi durumda hata alırız.
  • RETENTION değeri ile açtığımız her window için işlenen verilerin kafka üzerinde saklanacağı süreyi veririz.
  • EMIT CHANGES ifadesi ile Table’dan bir Materialized View oluşturup sorgumuzun arka planda sürekli olarak, yani işlem çıktısında değişime sebep olacak gelen her yeni event için sorgunun çalışmasını sağlar. Bu sorgular PUSH QUERY olarak adlandırılır.
count ≥ 1 değeri için oluşan tablo

Kafka Streams vs ksqlDB

ksqlDB aslında bir Kafka Streams uygulamasıdır, yani ksqlDB farklı yeteneklere sahip tamamen farklı bir üründür, ancak dahili olarak Kafka Streams kullanır. Dolayısıyla hem benzerlikler hem de farklılıklar vardır.

Kafka Stream bir kütüphane ile geliştirme ortamı sunarken ksqlDB CLI, REST API ve UI ile erişim ve geliştirme ortamı sunar bu da kullanıcılara platform ve kullanım rahatlığı sağlar.

Kaynak kullanımı ve bu kaynakların dağıtımı ile ilgili farklılıklar vardır. Bir kafka stream uygulaması paralel olarak işleme yapar bu da CPU kullanımını artmasına sebep olur. Bir stream uygulaması yazmak ve onu deploy etmek, ksqlDB kullamaktan tamamen farklı bir deneyim sunar.

Farklı proglamlama dillerinde işlenen veriyi kullamak çok daha kolay olur.

Kafka streams kodlama ve yönetim açısından geliştiricilere daha fazla sorumluluk yüklese de ksqlDB kullanımına göre fazla esneklik sağlarken ksqlDB kullanımı daha kolay bir şekilde geliştirme yapmamızı sağlar.

Kullanım Alanları

  • Monitoring.
  • Anomaly Detection.
  • Filter.
  • Farklı veri kaynaklarına, örneğin Elasticsearch, Kafka Connect ile veri aktarımı.
  • Streaming ELT (e.g: Click Stream).
  • IoT, nesnelerin interneti, ile sensor verilerinin analizi.

Avantajları

  • ksqlDB kullanmanın en güzel avantajlardan biri de iş yükünün azalması yani stream processing uygulaması yazıp, yönetme sorumluluğunu ortadan kaldırıyor.
  • ksqlDB sayesinde verileri işlerken kullanılan dil bağımlılığını ortadan kaldırıyor. İşlenen verilerin sadece consume edilmesi ya da Kafka Connect ile farklı kaynaklara aktarılması süreci oldukça kolaylaştırıyor.
  • Aggregation by Windowing, ilişkisel veri tabanlarında olmayan bu özellik ile zamansal bir çerçevede verileri işler. Veriyi zamansal olarak gruplamayı sağlar.

Dezavantajları

  • SQL yazarken hardcode değerlerin kullanılması örneklersek, monitoring amaçlı bir SQL’de level=”ERROR” ya da rate limit koyduğumuz bir uygulamada count> 3 şeklinde bir tanımlama.
  • Esnekliğinin kendi yönettiğimiz uygulamalara göre daha az olması.
  • Domain kavramlarının infra tarafına sızması ve teknoloji bağımlılığının artması.

Özetlemek gerekirse;

Basit bir arayüze sahip olması, farklı koleksiyonlardaki verileri birleştirme, verileri toplama, anahtarlı aramalar kullanılarak sorgulanabilen Materialized View oluşturma ve daha fazlası dahil olmak üzere birçok orta ve gelişmiş stream processing kullanım senaryolarını destekler. Ayrıca, ksqlDB bize çok çeşitli veri işleme ve zenginleştirme kullanım durumlarını ele almak için kullanabileceğimiz kapsamlı fonksiyonlar sağlar. Matematiksel işlevler (AVG, COUNT_DISTINCT, MAX, MIN, SUM, vb.), String/text fonksiyonları (LPAD, REGEXP_EXTRACT, REPLACE, TRIM, UCASE, vb.) veya hatta coğrafi işlevler (GEO_DISTANCE) gibi fonksiyonların tamamını kapsar. Üstelik kendi fonksiyonlarımızı(UDF) yazıp kullanmamıza bile olanak verir.

Zaman ayırdığınız için teşekkür ederim ❤

Bir sonraki yazıda görüşmek üzere :)

--

--