Java ile CQRS Design Pattern | Docker, Elasticsearch, RabbitMQ, Spring, MySQL
Bu yazı S-Teknoloji’de yazılmış olan CQRS Design Pattern Nedir, Neden Kullanılır? Bir CQRS Design Pattern İncelemesi adlı yazının Java ile uygulanmış halini içermektedir. CQRS tasarım deseninin Java ile uygulanması sırasında; Elasticsearch, RabbitMQ, Docker, MySQL, Spring API ve Spring Data gibi teknolojilerden faydalanacağız.
Başlamadan önce projenin tamamına GitHub hesabımdan direkt erişebilirsiniz, yazı içerisindeki kodlar Gist olarak eklenecektir.
Geliştirdiğim open-source projeler ve yaptığım diğer çalışmaları incelemek isterseniz GitHub hesabıma göz atabilir ve destek olabilirsiniz.
Senaryo
Uygulama içerisindeki senaryoda ilanlarımız mevcut. İlan verilerine ait okuma ve yazma işlemlerini ayıracağız. Yazma işlemlerinde birincil veri kaynağımız olan MySQL’e yazacağız, okuma isteklerini ise Elasticsearch üzerinden gerçekleştireceğiz. Aradaki veri tutarlılığını sağlamak için de veri tabanına yazma işlemlerinden sonra RabbitMQ kullanacağız ve event göndereceğiz, gönderilen event’i de yakalayarak Elasticsearch’te bulunan index üzerinde ilgili güncellemeleri yapacağız. Yukarıda da bu senaryonun görselleştirilmiş hali yer alıyor.
Ayrık Veri Tabanları Seçimi
Yukarıda bahsedilen senaryoda CQRS’in temel olarak uygulanmasında okuma ve yazma verileri aynı veri kanyağı içerisinde bulunabilir ve ayrı veri tabanı kullanmak zorunlu değildir fakat avantajları yer almaktadır.
KISS is an acronym for Keep It Simple, Stupid. This principle says about to make your code simple. You should avoid unnecessary complexity. A simple code it’s easier to maintain and easier to understand.
KISS prensibi, bizlere uygulamayı daha basit tutmamızı söyler fakat artan trafik ve uygulamaya ait kaynakları verimli kullanmak amacıyla bu tür senaryolara da gidilebilir. Sonuç itibarıyla aynı veri kaynağında bulunduğu için aynı kaynağa hem okuma hem de yazma istekleri gelecektir.
Okuma ve yazma işlemleri asimetrik işlemlerdir, birbirinden farklı performans ve ölçekleme gereksinimleri barındırır.
Okuma ve yazma işlemlerini ayırmayla birlikte;
- Independent scaling: okuma ve yazma işlemleri birbirinden ayrıldığı için bu yapılar birbirinden bağımsız ölçeklenebilir bir yapı haline gelmektedir.
- Optimized data schemas: Okuma tarafında bulunan veri kaynağı okumaya özgü, yazma tarafına ait veri kaynağı da yazmaya özgü şema kullandığından dolayı iki tarafta da kendi sorgularına göre optimize edilmiş bir yapı kullanıyor.
- Separation of concerns: Okuma ve yazma kısımlarını ayırmak, daha sürdürülebilir ve esnek modellerle sonuçlanabilir. Karmaşık iş/business mantığının çoğu yazma modeline girer. Okuma modeli nispeten basit olabilir.
Gerekli Ortamın Kurulumu ve Kontrolü
Docker Servislerinin Ayağa Kaldırılması
Aşağıda docker-compose dosyası yer almaktadır. Uygulama içerisinde kullanacağımız MySQL, Elasticsearch ve RabbitMQ gibi teknolojilerin; image, port, environment ve diğer bilgilerini içermektedir.
Gerekli container’ların ayağa kaldırılması için docker-compose dosyasının bulunduğu dizinde cli yardımı ile docker-compose up -d diyerek bunu gerçekleştirebiliriz. Burada -d parametresi ile terminale attach olmamasını istedik, dilerseniz bu parametreyi silerek gerçekleşecek istekleri ve diğer süreçleri direkt terminal üzerinden de görebilirsiniz.
MySQL Veri Tabanı Tasarımı
Docker container’ları ayağa kaldırdıktan sonra yapmamız gereken bir sonraki işlem ise MySQL içerisindeki veri tabanında işlemleri gerçekleştireceğimiz ilanlara ait tablo tasarımını yapmak.
docker-compose dosyasında görülen MySQL Environment bilgilerinden yola çıkarak ilgili bağlantıyı sağladıktan sonra orada tanımlanan veri tabanı içerisinde aşağıdaki SQL kodunu çalıştırmak gerekmektedir.
Yukarıdaki tablo tasarımından da görüldüğü gibi bir ilana ait; başlık, detay, fiyat gibi temel bilgiler yer almaktadır.
Elasticsearch’te Index Oluşturulması ve Mapping Ayarlanması
Uygulama içerisinde okuma işlemlerini Elasticsearch üzerinden yapacağımızdan MySQL’deki tablonun Elasticsearch üzerinde de modellenmesini gerekmektedir.
Elasticsearch’ün başarılı bir şekilde çalıştığından emin olmak için aşağıdaki cURL isteği çalıştırılabilir ya da direkt cURL içerisindeki end-point’e de gidilebilir.
curl -XGET "http://localhost:9200/_cat/health?format=json&pretty"
Aşağıdaki cURL kodu çalıştırıldığında Elasticsearch üzerinde Index’in ilgili mapping ile birlikte oluşturulması beklenmektedir.
Yukarıdaki cURL isteği çalıştırıldıktan sonra işlemin başarıyla gerçekleştiğine dair bir response gelecektir. Tekrardan kontrol etmek isterseniz aşağıdaki cURL isteğini çalıştırabilir ya da direkt istek içerisindeki adrese gidebilirsiniz.
curl -XGET "http://localhost:9200/classifieds/_mapping?pretty&format=json"
RabbitMQ Dashboard Kontrolü
Son olarak RabbitMQ kontrolünü de gerçekleştirdikten sonra uygulamanın kodlanması kısmına geçilebilir.
Docker-compose dosyasında da görüldüğü gibi RabbitMQ için açılan portlardan birisi de 15672. Bu port arayüz için verilmektedir, arayüze erişmek için http://localhost:15672/ adresine gidilmesi gerekmektedir. Her ne kadar docker-compose dosyasında belirtilmese de RabbitMQ için default gelen kullanıcı adı ve parola mevcut, hem kullanıcı adı hem de parola olarak guest kullanılmaktadır. Giriş yaptıktan sonra karşımıza aşağıdaki gibi bir ekranın gelmesi gerekmektedir.
Java Uygulamasının Oluşturulması ve Bağımlılıkların Eklenmesi
Bu kısımda hızlı bir kurulum için Spring Initializr’den faydalanacağız. İlgili projeyi oluştururken; GroupId=com.example ve ArtifactId=cqrs-design-pattern-java kullanacağız dilerseniz sizler başka bir isimlendirmede de bulunabilirsiniz.
Bağımlılık olarak Spring Initializr’den sadece Spring Web ve Spring Data JPA’yı alacağız diğerlerini de manuel olarak pom.xml dosyasına ekleyeceğiz. Bu kısım tamamen bireysel tercih nedeni olmakla birlikte RabbitMQ gibi kütüphaneler hali hazırda soyutlanmış yapı ve güzel bir dökümantasyon sağlarken uygulama içerisinde her şeyi Spring Framework ile yapmak ve onları bilmek için de ayrı efor sarf etmek istemememden dolayıdır.
Aşağıda Spring Initializr’ın ekran görüntüsü yer almaktadır.
İlgili projeyi oluşturduktan ve gerekli düzenlemeleri yaptıktan sonra pom.xml dosyası aşağıdaki gibi olacaktır.
CQRS Tasarım Deseninin Java ile Uygulanması
Bir önceki başlıklarda da görüldüğü gibi; servislerin ayağa kaldırılması, kontrolleri ve temel bir Maven projesinin oluşturulması gerçekleştirildi. Bu kısımda da ortamın hazır olmasıyla birlikte CQRS tasarım deseninin Java ile uygulanmasına geçilecektir.
Entity ve Repository Sınıflarının Oluşturulması
Veri tabanı tablosundan da yola çıkarak tablo modelini sınıf üzerinde de gerçekleştireceğiz. Aşağıda Classified sınıfı, Classified sınıfına ait repository ve MySQL bilgilerinin bulunduğu application.properties dosyası bulunmaktadır. İlgili repository de CrudRepository<,>’den türemektedir.
RabbitMQ Server Implementasyonu
Okuma ve yazma işlemlerini ayırdığımız için okuma işlemlerini gerçekleştirdiğimiz veri kaynağındaki veriler eski kalabilir. Yazma işlemlerinden sonra yapılan değişikliklerin okuma kısmına ait veri kaynağına da yansıması gerekmektedir. Bu tür durumlarda hangi verinin eski kaldığının tespiti ya da kullanıcının istekte bulunduğu verinin eski olup olmadığının tespiti zor olmaktadır. Uygulama içerisinde read ve write modelleri arasındaki haberleşmeyi sağlamak için RabbitMQ kullanacağız.
Aşağıda RabbitMQ ile bağlantıyı kuracak ve istediğimiz vakit bizlere ilgili Connection ve diğer nesneleri verecek bir RabbitMqServer sınıfı yazılmıştır. Server ile ilgili bağlantıyı kurmak için gerekli bilgiler oluşturulan sınıfta yer almaktadır.
RabbitMQ Publisher Implementasyonu
Birincil veri tabanımız olan MySQL’e ilgili kaydı attıktan sonra aradaki veri tutarlılığını sağlamak için RabbitMQ ile haberleşmesini sağlayacağımızdan bahsettik ve bunun için de RabbitMQ ile bir event gönderme gereksinimimiz doğmaktadır.
Aşağıda, RabbitMQ ile bağlantı kurmamızı sağlayan sınıfı tanımladıktan sonra event/istek göndermemizi sağlayan ve RabbitMqServer sınıfını kullanan RabbitMqPublisherImpl sınıfı ve buna ait interface görülmektedir.
Temel olarak isteklerin hangi kuyruğa gideceğini ve gönderilecek olan json datayı almaktadır, sonrasında ise ilgili bağlantı ile o kuyruğa veriyi göndermektedir.
RabbitMQ Receiver Implementasyonu
RabbitMQ’ya ait server ve publisher işlemleri tamamlandıkta sonra gönderilen event’in alınmasını sağlayan bir receiver tanımlanması gerekmektedir.
Aşağıdaki sınıfa bakıldığında ise tanımlanan kuyruk üzerindeki veriyi dinlememize yardımcı olan metot yer almaktadır. Parametre olarak bizlerden bir DeliverCallback istemektedir, bu sayede metot referansını parametre olarak geçerek ilgili verinin kuyruktan okunduğu andaki durumunu handle edebiliriz/ele alabiliriz.
Elasticsearch Client Implementasyonu
Elasticsearch tarafında bizlere; Elasticsearch ile haberleşecek Client’ın ve ilgili Index üzerinde arama ve döküman ekleme işlemlerini yapacak sınıflar gerekmektedir. Aşağıda bizlere RestHighLevelClient örneği veren metota sahip sınıf görülmektedir. Bu sınıf ile ilgili bağlantı üzerinden Index’e ait işlemlerimizi gerçekleştireceğiz.
Elasticsearch Search Service Implementasyonu
Elasticsearch ile ilgili bağlantı işlemleri bir üstteki konuda gerçekleştirildi. Uygulama içerisinde birincil veri kaynağımız olan MySQL’e veri aktardıktan sonra gönderilen event ile alınan verinin Elasticsearch’e eklenmesini ve ilgili Index üzerinde arama yapmamızı sağlayan metotların yazılması da gerekmektedir. Aşağıda bu iki işlemi gerçekleştiren arayüz ve sınıf yer almaktadır.
ElasticsearchServiceImpl sınıfına bakıldığında döküman oluştururken ilgili kaydı belirlenen Index üzerine ekleme işlemi gerçekleştirmektedir. Arama işleminde ise direkt matchAllQuery kullanmaktadır yani hiçbir sorgu yapmadan direkt var olan verilerin getirilmesi sağlanmaktadır. Dilerseniz bu kısımları kendinize göre de genişletir Term ya da Boolean Query’ler de yazabilirsiniz.
CQRS — Command Implementasyonu
Command, temel olarak veri kaynağındaki verinin değiştirilmesini ifade eder. Bu değişimler; Insert, Update, Delete gibi işlemler sayesinde olur. Genel olarak geriye bir veri döndürmesi beklenmez, geriye değer döndürmek bir query/okuma tarafından beklenen bir davranıştır.
Yukarıda Command olarak tanımlanan interface, uygulama içerisinde bizlere request ile gelecek sınıfların imzasını oluşturmaktadır. Başka da bir özelliği bulunmamaktadır.
CommandResult sınıfı ise işlemlerin başarılı bir şekilde gerçekleşip gerçekleşmediğini belirtiyor.
CommandHandler arayüzü ilgili veri tabanında değişikliği yapacak herbir işlemi temsil etmektedir, handle metotu da yapılacak işlemi temsil eder ve parametre olarak da Command arayüzünü uygulayan bir sınıf beklemektedir. Aşağıda temel bir ilan oluşturma senaryosu yer almaktadır.
CreateClassifiedCommand sınıfı bizlere ilan oluşturma adımında gerekli olan parametreleri taşıyan sınıftır ve sınıf imzası olarak da Command arayüzünü uygulamaktadır ve bu işlemi-gerçekleştirecek/handle-edecek sınıf olan CreateClassifiedCommandHandler sınıfı da aşağıda görülmektedir.
CreateClassifiedCommandHandler sınıfına bakıldığı zaman CommandHandler<CreateClassifiedCommand> arayüzünü uygulamaktadır ve handle metotu içerisinde de CreateClassifiedCommand örneği beklemektedir.
handle metotunun detayına bakıldığı zaman;
- İlgili ilanın hazırlanması (Classified)
- İlanın MySQL’e atılması
- RabbitMQ ile CLASSIFIED_INSERTED_QUEUE adlı kuyruğa asenkron olarak ilan verisinin gönderilmesi
gerçekleştirilmiştir.
Bu gönderilen event’in bir yerlerden yakalanıp ilgili Elasticsearch Index’ini güncellemesi gerekmektedir. Yazının ilerleyen kısımlarında buna da değinilecektir. Şu anlık Command olarak ilgili işlemin sağlıklı bir şekilde yapıldığı görülmektedir; veri tabanına ilgili kaydı atıp Read tarafındaki veri kaynağını güncellemesi için de bir event fırlatmaktadır.
CQRS — Query Implementasyonu
Query, temel olarak veri kaynağındaki verilerin alınmasından sorumludur veriye ait herhangi bir değeri değiştirmemesi gerekmektedir.
Yukarıda Query olarak tanımlanan interface, uygulama içerisinde bizlere request ile gelecek sınıfların imzasını oluşturmaktadır. Başka da bir özelliği bulunmamaktadır.
QueryHandler arayüzü ilgili veri kaynağından okuma yapacak herbir işlemi temsil etmektedir, handle metotu da yapılacak işlemi temsil eder ve parametre olarak da Query arayüzünü uygulayan bir sınıf beklemektedir. Geriye de generic olarak aldığı tipi göndermektedir. Aşağıda temel bir ilan listesi alma senaryosu yer almaktadır.
GetClassifiedsQuery sınıfı bizlere ilan listesini alma adımında gerekli olan parametreleri taşıyan sınıftır ve sınıf imzası olarak da Query arayüzünü uygulamaktadır ve bu işlemi-gerçekleştirecek/handle-edecek sınıf olan GetClassifiedsQueryHandler sınıfı da aşağıda görülmektedir.
GetClassifiedsQueryHandler sınıfına bakıldığı zaman QueryHandler<GetClassifiedsQuery, List<Classified>> arayüzünü uygulamaktadır ve handle metotu içerisinde de GetClassifiedsQuery örneği beklemektedir.
Read ve Write işlemleri ayrıldığından dolayı okuma işlemlerini Elasticsearch üzerindeki classifieds Index’inden gerçekleştirmektedir.
Eventual Consistency
Eventual Consistency is a guarantee that when an update is made in a distributed database, that update will eventually be reflected in all nodes that store the data, resulting in the same response every time the data is queried.
Yukarıdaki adımlarda birincil veri kaynağı olan MySQL’e ilgili ilanın yüklenmesi ve ardından da RabbitMQ tarafında verinin bir event olarak gönderilmesi sağlandı. Okuma tarafında da ilgili kaydın Elasticsearch üzerinden okunması sağlandı fakat yapılan işlemlere bakıldığı zaman event olarak gönderilen datanın okunması ve ilgili Elasticsearch Index’ini güncellemesi yapılmadı. Bu kısımda ise bu durum ele alınacak ve Index üzerindeki datanın güncel olması sağlanacaktır.
Elasticsearch Updater Implementasyonu
Aşağıda yapılan güncelleme işlemi aynı proje içerisinde gerçekleşmektedir. Dilerseniz bu kısmı farklı bir micro-service/service/proje olarak da ayarlayabilir ve performans durumuna göre farklı programlama dilleri de seçebilirsiniz.
ElasticsearchUpdater sınıfına bakıldığı zaman, RabbitMQ ile CLASSIFIED_INSERTED_QUEUE kuyruğunu dinlemekte ve ilgili veri kuyruğa atıldığı zaman alınıp Elasticsearch üzerindeki Index’i güncelleme işlemini gerçekleştirmektedir. Bu sayede 2 veri kaynağı arasındaki tutarsızlığın da önüne geçilmiş olunacaktır.
API Controller Implementasyonu
Yapılan işlemlerden sonra API end-point’lerinin yazılması işlemi de yukarıda görülmektedir. Okuma işlemleri Query üzerinden, yazma işlemleri de Command üzerinden gerçekleştirilmiştir.
Uygulamanın Test Edilmesi
Aşağıdaki cURL komutları önce bir adet ilan verisini insert etmeye çalışıyor daha sonra ise okuma işlemini gerçekleştiriyor. Bu sayede verinin önce MySQL’e daha sonra da gönderilen event’in yakalanıp Elasticsearch üzerine gönderilmesinin kontrolünü gerçekeştiriyor.
İlgili ilan bilgisini gönderilmesi;
curl --location --request POST 'http://localhost:8080/classifieds' \
--header 'Content-Type: application/json' \
--data-raw '{
"title": "Macbook Pro 2019",
"detail": "Sahibinden çok temiz Macbook Pro 2019.",
"price": 27894,
"categoryId": 47
}'
İlan verilerinin okunması;
curl --location --request GET 'http://localhost:8080/classifieds'
Yukarıda ilgili komutların çalıştırıldıktan sonraki hali yer almaktadır.
Beni; Github, Twitter ve diğer sosyal ağlardan takip edebilirsiniz.