AXON FRAMEWORK KULLANARAK SPRING BOOT İLE CQRS PATTERN İMPLEMENTASYONU PART 1
Bu konuya ait İlk makalede CQRS ve SAGA Patternlerinin açıklamalarını bulabilirsiniz.Ayrıca axon frameworkle ilgili temel kavramlara ait açıklamalarıda bulunabilir.Son olarak spring boot ve axon frameworkle örnek bir cqrs implementasyonunu yine part 1 makalesinde okuyabilirsiniz.Saga implementasyonu part2 makalesinin içeriğinde yer alacaktır.
CQRS(Command Query Responsibility Segregation) NEDİR?
Command:Veriyi oluşturan(insert) ya da veriyi manipüle eden(update,delete) işlemlerine command denir. Commandlar gelecekte reddebilecek veya kabul edilebilecek bir isteği temsil eder.
Query:Veriyi okumak (select) için yapılan işlemlere query denir.
CQRS ise bu iki işlem tipini birbirinden ayrıştırarak farklı data modelleriyle gerçekleştirme prensibine denir.
Handlers:Uygulama üzerinde yapılacak her command ve query isteklerini işleyecek yapılara denir.
Aggregate:Her zaman tutarlı bir durumda tutulan varlık/varlıklar grubudur.
CQRS tabanlı işlemlerde aggregateler değişimin başladığı yerdir.Kendi başlarına domain driven desingde nesneler bir araya gelerek iş akışını oluşturmamıza olanak sağlar.
Aggregate bir jpaentitysi değil bir business entity veya entity kümesi olarak düşünebiliriz.
Events:Zaten olmuş ve geri alınamayacak (immutable) bir geçmişi temsil eder
Bir eventin birden fazla tüketicisi olabilir.Ancak commandlar sadece bir tanesine yöneliktir.
CQRS çok fazla paydaşın olduğu verilerde geliştirme aşamalarını farklı takımlara bölmemize olanak sağlayarak her paydaşın kendi logici üzeirnde çalışmasına olanak sağlayan bir yaklaşımdır.
Domain Driven Design:Wikipedia daki tanımı ‘Yazılım kodunun yapısının ve dilinin iş alanıyla eşleşmesi gerektiği kavramıdır. Örneğin, bir yazılım kredi başvurularını işliyorsa, LoanApplication ve Customer gibi sınıflara ve AcceptOffer ve Withdraw gibi yöntemlere sahip olabilir. DDD, uygulamayı gelişen bir modele bağlar.’
SAGA PATTERN NEDİR?
Microservis mimarinizde her microservis için ayrı bir database kullanıyorsanız bunun sonucu olarak özellikle long running işlemlerde transaction yöntemi olarak saga pattern implementasyonu yapmanız gerekecektir.
Saga pattern dağıtık mimarilerde veriler arası tutarlılık (consistency) sağlamak amacıyla bir hata yönetim patterni olarak karşımıza çıkmaktadır.
Saga Pattern implementasyonunda her local transaction bir event veya message başlatarak bir sonraki transactionın başlatılmasından sorumludur.
Hata durumu oluştuğunda saga çözümü sayesinde yazılmış olan compensation servisleri ile yapılmış transactionların rollback olması sağlanır.
Ancak unutulmaması gereken önemli bir nokta saga çözümleri dirty transaction oluşmasına açık bir konudur.
İki tip Saga Pattern yöntemi vardır.
Choregrapy Based Saga:Her bir transaction kendin sonraki transaction ve compensatiton senaryolarını tetiklemekten sorumludur.
Orchestaration Based Saga:Saga pattern için gerekli transaciton akışını yöneten bir koordinatör bulunmaktadır
Burada orchestration Based Saga patternini sağlamak amacıyla axon framework kullanıyor olacağız.
AXON FRAMEWORK NEDİR?
Axon framework domain driven design tekniklerini kullanarak cqrs,saga gibi çözümleri uygulamaya yarayan java based bir microservis frameworküdür.
Axon frameworkünün bir diğer artısı spring ekosistemi ile uyumlu çalışabilen bir framework olmasıdır.
Öncelikle axon sitesinden
AxonQuickStart dosyasını indirerek kurulumunu gerçekleştiriyoruz
Axon Framework ek olarak AxonServer birleşenide bulunmaktadır.AxonServer axon framework entegre kullanabildiğimiz gibi ayrı olarak da kullanabiliyoruz.
Axon Server sayesinde event,command mesajların birbiri arasında yönlendirilmesi,monitoring security gibi işlemleri daha hızlı yapabiliyoruz.
https://docs.axoniq.io/reference-guide/axon-server-introduction
Axon Server container üzerinden kullanılabileceği gibi executive jar olarak da indirip start edebilirsiniz
https://docs.axoniq.io/reference-guide/axon-server/installation/local-installation/axon-server-se
java -jar C:\AxonServer-4.5.7\axonserver.jar
Default olarak aşağıdaki porttan axon server management console erişilebilir
Axon Terimleri:
· AggreagateLifeCycle:Bir aggreagete üzerinden olay(event)yayınlamak için.Apply kullanılır.Bir command execyte edilerken uygulamanın gerikalanına yeni bir event oluşturulduğunu bildirir.
· CommandGateway:Axonun komut işleme birleşenlerine yönelik arayüzüdür.Command gateway ile gönderilen komutların sonuçları beklenebilir.
· QueryGateway:
CQRS için axonframework çalışma mantığı:
İlk hatamız
Caused by: org.h2.jdbc.JdbcSQLSyntaxErrorException: Table “TOKEN_ENTRY” not found; SQL statement:
select tokenentry0_.processor_name as processo1_3_0_, tokenentry0_.segment as segment2_3_0_, tokenentry0_.owner as owner3_3_0_, tokenentry0_.timestamp as timestam4_3_0_, tokenentry0_.token as token5_3_0_, tokenentry0_.token_type as token_ty6_3_0_ from token_entry tokenentry0_ where tokenentry0_.processor_name=? and tokenentry0_.segment=? [42102–200]
at org.h2.message.DbException.getJdbcSQLException(DbException.java:453) ~[h2–1.4.200.jar:1.4.200]
Axonframework event yapılarını takip edebilmek için database üzerinde tablolara ihtiyaç duymaktadır.Bu tablolar bulunmadığı takdirde yukarıdaki gibi hata olacaktır
Sample için bu hatayı alırsanız hızlı çözüm adına application.properties içerisine aşağıdaki ayarı ekleyerek ilerleyebilirsiniz.
spring.jpa.hibernate.ddl-auto=update duruma göre create de diyebiliriz veya tabloları manuel oluşturmamız gereklidir
AXON tarafından oluşturulan tablolar
SAGA_ENTRY :serialize edilmiş saga kayıtlarının tutulduğu tablo
ASSOCATION_VALUE_ENTRY:
TOKEN_ENTRY:
TrackingEventProcessor TrackingTokenı işlenen eventlerin durumunu izleme için kullanılır
TrackingToken eventstream üzerindeki eventin pozisyonunu temsil eder.
Tracking Tokenlar tokenstoreda saklanır.
TokenStore implementasyonu için jpa,jdbc vs gibi farklı implementasyonlarla saklanabilir
TokenEntry tablosu:Axon Framework her processing group için bir token tutar.
Axon Terimleri:
· AggregateIdentifier: string,UUID ve primitive olmayan numericler kullanılabilir, örnek int seklinde identifier sequence hatasına yol açmaktadır.
OUT_OF_RANGE: [AXONIQ-2000] Invalid sequence number: 0
· EventSourcingHandler: Gönderilen eventin işleyicisini belirtmek için axon anotasyonu
· CommandHandler:Gönderilen komutun işleyicisini belirtmek için axon anotasyonu
· Projector sınıfları:Axon frameworkünde her bir event geldiği zaman db işlemlerini yapacak sınıfları tanımlamak için oluşturulan keywordün(mantıksal bir terminoloji)
· EventHandler:Yaratılmış event geldikten sonra projector sınıfı içerisinde eventleri dinlemek için oluşturulan methodların tepesine konulan axon anotasyonu
Öncelikle command ve event sınıflarımızı oluşturalım
Command classımızdaki @TargetAggregateIdentifier Aggregateimizin idsi için gereklidir.
import org.axonframework.modelling.command.TargetAggregateIdentifier;public class CreateTestEntityCommand { @TargetAggregateIdentifier public final String id; public final String name; public final String info; public final int testid; public CreateTestEntityCommand(String id,String name,int testid) { this.id=id; this.name=name; this.testid=testid; this.info="Test Entity creation will be requested"; }}
Event classımız
public class TestEntityCreatedEvent {
public final String id;
public final String name;
public final String info;
public final int testid;
public TestEntityCreatedEvent(String id,String name,int testid)
{
this.id=id;
this.name=name;
this.info=”Test Entity created Event”;
this.testid=testid;
}
}
Genel pattern olarak command isimleri yapılacak aksiyon(create,update vs)+Command son ekiyle biter
Eventler ise geçmiş zaman aksiyonu(Created,Updated) +Event son ekiyle biter
Aggregate classımız
Not: AggregateIdentifier olarak seçilecek alan command.gatewayden gönderilerken kullanılacak event id olmalı aksi takdirde aggregate not found hatası alınır.Genel örneklere bakıldığında aggregateidentifier ile jpa identifier aynı olduğunu görülmektedir.Ancak aşağıdaki örneklerde ikisi farklı kurulacağını göstermek için jpadaki id ile aggreagateidentifierlar ayrı tutulmuştur.AggreagaeIdentifier String,UUID veya Integer gibi (primitve olmayan) sayısal değerler kullanılabilir.
@Aggregate
public class TestAggregate {
@AggregateIdentifier
private String id;
private String name;
private String info;
private int status;
private int testid;
public TestAggregate(){}
@CommandHandler
public TestAggregate(CreateTestEntityCommand command)
{
AggregateLifecycle.apply(new TestEntityCreatedEvent(command.id,command.name,command.testid));
}
@EventSourcingHandler
protected void on(TestEntityCreatedEvent testEntityCreatedEvent){
this.id=testEntityCreatedEvent.id;
this.name=testEntityCreatedEvent.name;
this.info=testEntityCreatedEvent.info;
}
@CommandHandler
public void updateTestAggregate(UpdateTestEntityCommand command)
{
AggregateLifecycle.apply(new TestEntityUpdatedEvent(command.id,command.status,command.testid));
}
@EventSourcingHandler
protected void on(TestEntityUpdatedEvent testEntityUpdatedEvent){
this.id=testEntityUpdatedEvent.id;
this.status=testEntityUpdatedEvent.status;
this.info=testEntityUpdatedEvent.info;
this.testid=testEntityUpdatedEvent.testid;
}
}
Bir servis katmanızda artık commandgateway sayesinde komutlarımızı gönderebiliyoruz.
@Service
public class TestServiceImpl {
private final CommandGateway commandGateway;
private final EventStore eventStore;
public TestServiceImpl(CommandGateway commandGateway,EventStore eventStore) {
this.commandGateway = commandGateway;
this.eventStore=eventStore;
}
public String createTest(int id)
{
try{
String idtoEvent= UUID.randomUUID().toString();
CompletableFuture<Object> result= commandGateway.send(new CreateTestEntityCommand(idtoEvent,"Bilgehan",id));
return idtoEvent;
}
catch (Exception ex)
{
System.out.println("Command error");
}
return "";
}….}
Projection classımızdaki handler sayesinde herevent geldiğinde(commandgatewayden send edilen eventler) ona göre aksiyon alabiliyoruz
@Component
public class TestProjection {
@Autowired
private TestRepository testRepository;
@EventHandler
public void on(TestEntityCreatedEvent event) {
System.out.println("Test entity yaratıldı"+event.id);
TestEntity testEntity=new TestEntity();
testEntity.setId(event.testid);
testEntity.setEventid(event.id);
testEntity.setName(event.name);
testEntity.setInfo(event.info);
testRepository.save(testEntity);
}…..}
Dipnot: TrackingEventProcessorConfiguration ve TokenStore ayarları yapılmazsa default olarak inmemory mantığında eventhandler çalışıyor yani uygulama her yeniden başladığında aynı eventlerin tekrar okunmasına neden oluyor o yüzden bir axon config oluşturarak bu ayarları yapıyor olmakta fayda bulunmaktadır.
@Configuration
public class AxonConfig {
@Bean
EventSourcingRepository<TestAggregate> testAggregateEventSourcingRepository(EventStore eventStore)
{
EventSourcingRepository<TestAggregate> repository= EventSourcingRepository.builder(TestAggregate.class).eventStore(eventStore).build();
return repository;
}
@Bean
public TokenStore tokenStore(Serializer serializer, EntityManagerProvider entityManagerProvider) {
return JpaTokenStore.builder()
.entityManagerProvider(entityManagerProvider)
.serializer(serializer)
.build();
}
@Autowired
public void configureProcessors(EventProcessingConfigurer eventProcessingConfigurer) {
TrackingEventProcessorConfiguration tepConfig = TrackingEventProcessorConfiguration.forSingleThreadedProcessing().andInitialTrackingToken(StreamableMessageSource::createHeadToken);
eventProcessingConfigurer.registerTrackingEventProcessorConfiguration(config -> tepConfig);
}
}
Yazma kısmı CORS sin CRS kısmını axonla yukarıdaki şekilde gerçekleştirebildik
CRS kısmı için akış
CommandGateway(Send) →Aggregate →CommandHandler(Command) →EventSourcingHandler(Event) →EventHandler →Db
Simdi QRS kısmının axonla nasıl olacağına bakalım.
Sorgulamada parametre geçişi için bir adet query sınıfı oluşturuyoruz
public class FindTestEntityQuery {
private String eventid;
public FindTestEntityQuery(String eventid)
{
this.setEventid(eventid);
}
public String getEventid() {
return eventid;
}
public void setEventid(String eventid) {
this.eventid = eventid;
}
}
QueryHandler ise artık sorgulamalara düşen eventler için kullandığımız anotasyon oluyor
@Component
public class TestProjection {
@Autowired
private TestRepository testRepository;
@QueryHandler
public TestEntity handle(FindTestEntityQuery query) {
System.out.println(“query: {}”+query);
return this.testRepository.findByEventID(query.getEventid());
}
}
Son olarak QueryGateway ile sorgularımızı yönlendirebiliyoruz
@Service
public class TestService {
private final QueryGateway queryGateway;
private final EventStore eventStore;
public TestService(QueryGateway queryGateway,EventStore eventStore)
{
this.queryGateway=queryGateway;
this.eventStore=eventStore;
}
public CompletableFuture<TestEntity> findById(String eventid) {
return this.queryGateway.query(
new FindTestEntityQuery(eventid),
ResponseTypes.instanceOf(TestEntity.class)
);
}
public List<Object> listEventsForTestEntity(String eventid) {
return this.eventStore
.readEvents(eventid)
.asStream()
.map(Message::getPayload)
.collect(Collectors.toList());
}
}
CQRS deki QRS akışı
QueryGateway(Query) →QueryHandler →Db
EventStore ReadEvents Reader yaparken aldığımız hata
com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class org.axonframework.serialization.UnknownSerializedType and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: java.util.ArrayList[0])
Çözüm properties dosyamıza ekleyip serializer ayarlarını yapmamız gerekiyor
axon.serializer.general=jackson
axon.serializer.events=jackson
axon.serializer.messages=jackson
https://docs.axoniq.io/reference-guide/v/master/axon-framework/events/event-serialization
CQRS Axon Örnek Kodlara github üzerinden erişip zenginleştirebilirsiniz.
https://github.com/BilgehanYildiz/AxonCQRSSample
Not:Reader projesi aynı repositorynin içinde zipli olarak bulunmaktadır
Not:Örnek mantıksal bir işleyişten cok temel olarak axon birleşenlerinin denenmesi amacıyla oluşturulmuştur.
Not:Kaynakça olarak faydalanabilirsiniz
https://blog.nebrass.fr/playing-with-cqrs-and-event-sourcing-in-spring-boot-and-axon/
https://progressivecoder.com/implementing-event-sourcing-using-axon-and-spring-boot-part-1/
https://www.gencayyildiz.com/blog/microservice-mimarilerde-saga-pattern-ile-transaction-yonetimi/
https://sefikcankanber.medium.com/saga-pattern-nedir-e4a447bef361