AXON FRAMEWORK KULLANARAK SPRING BOOT İLE CQRS PATTERN İMPLEMENTASYONU PART 1

BilgehanYıldız
7 min readOct 17, 2021

--

Bu konuya ait İlk makalede CQRS ve SAGA Patternlerinin açıklamalarını bulabilirsiniz.Ayrıca axon frameworkle ilgili temel kavramlara ait açıklamalarıda bulunabilir.Son olarak spring boot ve axon frameworkle örnek bir cqrs implementasyonunu yine part 1 makalesinde okuyabilirsiniz.Saga implementasyonu part2 makalesinin içeriğinde yer alacaktır.

CQRS(Command Query Responsibility Segregation) NEDİR?

Command:Veriyi oluşturan(insert) ya da veriyi manipüle eden(update,delete) işlemlerine command denir. Commandlar gelecekte reddebilecek veya kabul edilebilecek bir isteği temsil eder.

Query:Veriyi okumak (select) için yapılan işlemlere query denir.

CQRS ise bu iki işlem tipini birbirinden ayrıştırarak farklı data modelleriyle gerçekleştirme prensibine denir.

Handlers:Uygulama üzerinde yapılacak her command ve query isteklerini işleyecek yapılara denir.

Aggregate:Her zaman tutarlı bir durumda tutulan varlık/varlıklar grubudur.

CQRS tabanlı işlemlerde aggregateler değişimin başladığı yerdir.Kendi başlarına domain driven desingde nesneler bir araya gelerek iş akışını oluşturmamıza olanak sağlar.

Aggregate bir jpaentitysi değil bir business entity veya entity kümesi olarak düşünebiliriz.

Events:Zaten olmuş ve geri alınamayacak (immutable) bir geçmişi temsil eder

Bir eventin birden fazla tüketicisi olabilir.Ancak commandlar sadece bir tanesine yöneliktir.

CQRS çok fazla paydaşın olduğu verilerde geliştirme aşamalarını farklı takımlara bölmemize olanak sağlayarak her paydaşın kendi logici üzeirnde çalışmasına olanak sağlayan bir yaklaşımdır.

Domain Driven Design:Wikipedia daki tanımıYazılım kodunun yapısının ve dilinin iş alanıyla eşleşmesi gerektiği kavramıdır. Örneğin, bir yazılım kredi başvurularını işliyorsa, LoanApplication ve Customer gibi sınıflara ve AcceptOffer ve Withdraw gibi yöntemlere sahip olabilir. DDD, uygulamayı gelişen bir modele bağlar.’

SAGA PATTERN NEDİR?

Microservis mimarinizde her microservis için ayrı bir database kullanıyorsanız bunun sonucu olarak özellikle long running işlemlerde transaction yöntemi olarak saga pattern implementasyonu yapmanız gerekecektir.

Saga pattern dağıtık mimarilerde veriler arası tutarlılık (consistency) sağlamak amacıyla bir hata yönetim patterni olarak karşımıza çıkmaktadır.

Saga Pattern implementasyonunda her local transaction bir event veya message başlatarak bir sonraki transactionın başlatılmasından sorumludur.

Hata durumu oluştuğunda saga çözümü sayesinde yazılmış olan compensation servisleri ile yapılmış transactionların rollback olması sağlanır.

Ancak unutulmaması gereken önemli bir nokta saga çözümleri dirty transaction oluşmasına açık bir konudur.

İki tip Saga Pattern yöntemi vardır.

Choregrapy Based Saga:Her bir transaction kendin sonraki transaction ve compensatiton senaryolarını tetiklemekten sorumludur.

Orchestaration Based Saga:Saga pattern için gerekli transaciton akışını yöneten bir koordinatör bulunmaktadır

Burada orchestration Based Saga patternini sağlamak amacıyla axon framework kullanıyor olacağız.

AXON FRAMEWORK NEDİR?

Axon framework domain driven design tekniklerini kullanarak cqrs,saga gibi çözümleri uygulamaya yarayan java based bir microservis frameworküdür.

Axon frameworkünün bir diğer artısı spring ekosistemi ile uyumlu çalışabilen bir framework olmasıdır.

Öncelikle axon sitesinden

AxonQuickStart dosyasını indirerek kurulumunu gerçekleştiriyoruz

Axon Framework ek olarak AxonServer birleşenide bulunmaktadır.AxonServer axon framework entegre kullanabildiğimiz gibi ayrı olarak da kullanabiliyoruz.

Axon Server sayesinde event,command mesajların birbiri arasında yönlendirilmesi,monitoring security gibi işlemleri daha hızlı yapabiliyoruz.

https://docs.axoniq.io/reference-guide/axon-server-introduction

Axon Server container üzerinden kullanılabileceği gibi executive jar olarak da indirip start edebilirsiniz

https://docs.axoniq.io/reference-guide/axon-server/installation/local-installation/axon-server-se

java -jar C:\AxonServer-4.5.7\axonserver.jar

Default olarak aşağıdaki porttan axon server management console erişilebilir

http://localhost:8024/

Axon Terimleri:

· AggreagateLifeCycle:Bir aggreagete üzerinden olay(event)yayınlamak için.Apply kullanılır.Bir command execyte edilerken uygulamanın gerikalanına yeni bir event oluşturulduğunu bildirir.

· CommandGateway:Axonun komut işleme birleşenlerine yönelik arayüzüdür.Command gateway ile gönderilen komutların sonuçları beklenebilir.

· QueryGateway:

CQRS için axonframework çalışma mantığı:

İlk hatamız

Caused by: org.h2.jdbc.JdbcSQLSyntaxErrorException: Table “TOKEN_ENTRY” not found; SQL statement:

select tokenentry0_.processor_name as processo1_3_0_, tokenentry0_.segment as segment2_3_0_, tokenentry0_.owner as owner3_3_0_, tokenentry0_.timestamp as timestam4_3_0_, tokenentry0_.token as token5_3_0_, tokenentry0_.token_type as token_ty6_3_0_ from token_entry tokenentry0_ where tokenentry0_.processor_name=? and tokenentry0_.segment=? [42102–200]

at org.h2.message.DbException.getJdbcSQLException(DbException.java:453) ~[h2–1.4.200.jar:1.4.200]

Axonframework event yapılarını takip edebilmek için database üzerinde tablolara ihtiyaç duymaktadır.Bu tablolar bulunmadığı takdirde yukarıdaki gibi hata olacaktır

Sample için bu hatayı alırsanız hızlı çözüm adına application.properties içerisine aşağıdaki ayarı ekleyerek ilerleyebilirsiniz.

spring.jpa.hibernate.ddl-auto=update duruma göre create de diyebiliriz veya tabloları manuel oluşturmamız gereklidir

AXON tarafından oluşturulan tablolar

SAGA_ENTRY :serialize edilmiş saga kayıtlarının tutulduğu tablo

ASSOCATION_VALUE_ENTRY:

TOKEN_ENTRY:

TrackingEventProcessor TrackingTokenı işlenen eventlerin durumunu izleme için kullanılır

TrackingToken eventstream üzerindeki eventin pozisyonunu temsil eder.

Tracking Tokenlar tokenstoreda saklanır.

TokenStore implementasyonu için jpa,jdbc vs gibi farklı implementasyonlarla saklanabilir

TokenEntry tablosu:Axon Framework her processing group için bir token tutar.

Axon Terimleri:

· AggregateIdentifier: string,UUID ve primitive olmayan numericler kullanılabilir, örnek int seklinde identifier sequence hatasına yol açmaktadır.

OUT_OF_RANGE: [AXONIQ-2000] Invalid sequence number: 0

https://discuss.axoniq.io/t/re-axonframework-re-differences-between-eventhandler-and-eventsourcinghandler/2433/2

· EventSourcingHandler: Gönderilen eventin işleyicisini belirtmek için axon anotasyonu

· CommandHandler:Gönderilen komutun işleyicisini belirtmek için axon anotasyonu

· Projector sınıfları:Axon frameworkünde her bir event geldiği zaman db işlemlerini yapacak sınıfları tanımlamak için oluşturulan keywordün(mantıksal bir terminoloji)

· EventHandler:Yaratılmış event geldikten sonra projector sınıfı içerisinde eventleri dinlemek için oluşturulan methodların tepesine konulan axon anotasyonu

Öncelikle command ve event sınıflarımızı oluşturalım

Command classımızdaki @TargetAggregateIdentifier Aggregateimizin idsi için gereklidir.

import org.axonframework.modelling.command.TargetAggregateIdentifier;public class CreateTestEntityCommand {    @TargetAggregateIdentifier    public final String id;    public final String name;    public final String info;    public final int testid;    public CreateTestEntityCommand(String id,String name,int testid)    {        this.id=id;        this.name=name;        this.testid=testid;        this.info="Test Entity creation will be requested";    }}

Event classımız

public class TestEntityCreatedEvent {
public final String id;
public final String name;
public final String info;
public final int testid;

public TestEntityCreatedEvent(String id,String name,int testid)
{
this.id=id;
this.name=name;
this.info=”Test Entity created Event”;
this.testid=testid;
}

}

Genel pattern olarak command isimleri yapılacak aksiyon(create,update vs)+Command son ekiyle biter

Eventler ise geçmiş zaman aksiyonu(Created,Updated) +Event son ekiyle biter

Aggregate classımız

Not: AggregateIdentifier olarak seçilecek alan command.gatewayden gönderilerken kullanılacak event id olmalı aksi takdirde aggregate not found hatası alınır.Genel örneklere bakıldığında aggregateidentifier ile jpa identifier aynı olduğunu görülmektedir.Ancak aşağıdaki örneklerde ikisi farklı kurulacağını göstermek için jpadaki id ile aggreagateidentifierlar ayrı tutulmuştur.AggreagaeIdentifier String,UUID veya Integer gibi (primitve olmayan) sayısal değerler kullanılabilir.

@Aggregate

public class TestAggregate {

@AggregateIdentifier

private String id;

private String name;

private String info;

private int status;

private int testid;



public TestAggregate(){}



@CommandHandler

public TestAggregate(CreateTestEntityCommand command)

{

AggregateLifecycle.apply(new TestEntityCreatedEvent(command.id,command.name,command.testid));

}



@EventSourcingHandler

protected void on(TestEntityCreatedEvent testEntityCreatedEvent){



this.id=testEntityCreatedEvent.id;

this.name=testEntityCreatedEvent.name;

this.info=testEntityCreatedEvent.info;

}



@CommandHandler

public void updateTestAggregate(UpdateTestEntityCommand command)

{

AggregateLifecycle.apply(new TestEntityUpdatedEvent(command.id,command.status,command.testid));

}



@EventSourcingHandler

protected void on(TestEntityUpdatedEvent testEntityUpdatedEvent){



this.id=testEntityUpdatedEvent.id;

this.status=testEntityUpdatedEvent.status;

this.info=testEntityUpdatedEvent.info;

this.testid=testEntityUpdatedEvent.testid;

}





}

Bir servis katmanızda artık commandgateway sayesinde komutlarımızı gönderebiliyoruz.

@Service

public class TestServiceImpl {

private final CommandGateway commandGateway;



private final EventStore eventStore;





public TestServiceImpl(CommandGateway commandGateway,EventStore eventStore) {

this.commandGateway = commandGateway;

this.eventStore=eventStore;

}





public String createTest(int id)

{

try{

String idtoEvent= UUID.randomUUID().toString();

CompletableFuture<Object> result= commandGateway.send(new CreateTestEntityCommand(idtoEvent,"Bilgehan",id));

return idtoEvent;

}

catch (Exception ex)

{

System.out.println("Command error");

}

return "";

}
….}

Projection classımızdaki handler sayesinde herevent geldiğinde(commandgatewayden send edilen eventler) ona göre aksiyon alabiliyoruz

@Component
public class TestProjection {

@Autowired
private TestRepository testRepository;

@EventHandler
public void on(TestEntityCreatedEvent event) {
System.out.println("Test entity yaratıldı"+event.id);
TestEntity testEntity=new TestEntity();
testEntity.setId(event.testid);
testEntity.setEventid(event.id);
testEntity.setName(event.name);
testEntity.setInfo(event.info);

testRepository.save(testEntity);
}
…..}

Dipnot: TrackingEventProcessorConfiguration ve TokenStore ayarları yapılmazsa default olarak inmemory mantığında eventhandler çalışıyor yani uygulama her yeniden başladığında aynı eventlerin tekrar okunmasına neden oluyor o yüzden bir axon config oluşturarak bu ayarları yapıyor olmakta fayda bulunmaktadır.

@Configuration
public class AxonConfig {

@Bean
EventSourcingRepository<TestAggregate> testAggregateEventSourcingRepository(EventStore eventStore)
{
EventSourcingRepository<TestAggregate> repository= EventSourcingRepository.builder(TestAggregate.class).eventStore(eventStore).build();
return repository;
}

@Bean
public TokenStore tokenStore(Serializer serializer, EntityManagerProvider entityManagerProvider) {
return JpaTokenStore.builder()
.entityManagerProvider(entityManagerProvider)
.serializer(serializer)
.build();
}


@Autowired
public void configureProcessors(EventProcessingConfigurer eventProcessingConfigurer) {
TrackingEventProcessorConfiguration tepConfig = TrackingEventProcessorConfiguration.forSingleThreadedProcessing().andInitialTrackingToken(StreamableMessageSource::createHeadToken);
eventProcessingConfigurer.registerTrackingEventProcessorConfiguration(config -> tepConfig);
}

}

Yazma kısmı CORS sin CRS kısmını axonla yukarıdaki şekilde gerçekleştirebildik

CRS kısmı için akış

CommandGateway(Send) →Aggregate →CommandHandler(Command) →EventSourcingHandler(Event) →EventHandler →Db

Simdi QRS kısmının axonla nasıl olacağına bakalım.

Sorgulamada parametre geçişi için bir adet query sınıfı oluşturuyoruz

public class FindTestEntityQuery {
private String eventid;

public FindTestEntityQuery(String eventid)
{
this.setEventid(eventid);
}

public String getEventid() {
return eventid;
}

public void setEventid(String eventid) {
this.eventid = eventid;
}
}

QueryHandler ise artık sorgulamalara düşen eventler için kullandığımız anotasyon oluyor

@Component
public class TestProjection {

@Autowired
private TestRepository testRepository;

@QueryHandler
public TestEntity handle(FindTestEntityQuery query) {
System.out.println(“query: {}”+query);
return this.testRepository.findByEventID(query.getEventid());
}
}

Son olarak QueryGateway ile sorgularımızı yönlendirebiliyoruz

@Service
public class TestService {
private final QueryGateway queryGateway;
private final EventStore eventStore;

public TestService(QueryGateway queryGateway,EventStore eventStore)
{
this.queryGateway=queryGateway;
this.eventStore=eventStore;
}

public CompletableFuture<TestEntity> findById(String eventid) {
return this.queryGateway.query(
new FindTestEntityQuery(eventid),
ResponseTypes.instanceOf(TestEntity.class)
);
}

public List<Object> listEventsForTestEntity(String eventid) {
return this.eventStore
.readEvents(eventid)
.asStream()
.map(Message::getPayload)
.collect(Collectors.toList());
}
}

CQRS deki QRS akışı

QueryGateway(Query) →QueryHandler →Db

EventStore ReadEvents Reader yaparken aldığımız hata

com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class org.axonframework.serialization.UnknownSerializedType and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS) (through reference chain: java.util.ArrayList[0])

Çözüm properties dosyamıza ekleyip serializer ayarlarını yapmamız gerekiyor

axon.serializer.general=jackson
axon.serializer.events=jackson
axon.serializer.messages=jackson

https://docs.axoniq.io/reference-guide/v/master/axon-framework/events/event-serialization

CQRS Axon Örnek Kodlara github üzerinden erişip zenginleştirebilirsiniz.

https://github.com/BilgehanYildiz/AxonCQRSSample

Not:Reader projesi aynı repositorynin içinde zipli olarak bulunmaktadır

Not:Örnek mantıksal bir işleyişten cok temel olarak axon birleşenlerinin denenmesi amacıyla oluşturulmuştur.

Not:Kaynakça olarak faydalanabilirsiniz

https://blog.nebrass.fr/playing-with-cqrs-and-event-sourcing-in-spring-boot-and-axon/

https://www.kindsonthegenius.com/microservices/cqrs-tutorial-with-axon-framework-step-by-step-project-for-beginners/

https://progressivecoder.com/implementing-event-sourcing-using-axon-and-spring-boot-part-1/

https://www.gencayyildiz.com/blog/microservice-mimarilerde-saga-pattern-ile-transaction-yonetimi/

https://sefikcankanber.medium.com/saga-pattern-nedir-e4a447bef361

--

--