RabbitMq ile Event Ordering (Consistent Hash Exchange Kullanımı)

emre tiryaki
Jun 7 · 6 min read

Event driven mimarisi ile geliştirme yapıyorsanız karşınıza çıkacak en önemli problemlerden birisi de projections /materialized views ( microsoft tabiri ile) uygulamalarınızı eventlerle doğru inşaa etmek olacaktır. Martin Fowler’ın makelesindeki gibi projection’ları farklı şekilde built edebilirsiniz. Bunlar Event Notification , Event-Carried State Transfer’dır. Bu iki yaklaşımında kendi içinde artıları ve eksileri bulunmaktadır. Bizim tercih ettiğimiz yöntem “Event-Carried State Transfer”dir.

Örnek: Müşteri 2 adet telefon sipariş vermiş olsun. Mağaza sahibinin ekranına bu sipariş bilgisi düşer. Mağaza yetkilisi portal üzerinde bu sipariş kalemlerine paket oluşturur. Daha yazılımsal bir ifadeyle CreatePackageCommand çalıştırılır. CommandBus (async) aracılığıyla CommandHandler’a iletirilir. CommandHandler ilgili domain servisi çağırır , uygulamanız message brokera PackageCreatedEvent’i ve LineMarkedAsInTransit’i eventbus aracılığı ile fırlatır. PackageCreatedEvent ve LineMarkedAsInTransit eventlerini dinleyen projection consumer’lar ise kendi içerisinde paket bilgisini oluşturmakta, sipariş kalemlerinin durumlarını güncellemektedir. Böylece sistemlerinizde hangi paket hangi kalemle ilişkilendirilmiş bilgisini görebilirsiniz.

Diğer durumda ise Mağaza yetkilisi yanlışlıkla paket oluştuduğunu farketti ve portal üzerinde paketi silmeye çalıştı. Böyle bir durumda ise DeletePackageCommand çalıştırılır. CommandBus (async) aracılığıyla CommandHandler’a iletilir. CommandHandler ilgili domain servisi çağırır , uygulamanız message brokera PackageDeletedEvent’i ve LineMarkedAsPlaced’i eventbus aracılığı ile fırlatır. PackageDeletedEvent ve LineMarkedAsPlaced eventlerini dinleyen projection consumer’lar ise kendi içerisinde paket bilgisini (soft-delelete) siler, sipariş kalemlerinin durumlarını güncellemektedir. Böylece sistemlerinizde kalemler tekrar “placed” konumunu çekilebilir.

Not:Placed durumunda müşteri iptal edebilir , Intransit durumunda müşteri iptal edemez.

Paket Oluşturma
Paket Silme

Sisteminiz bu eventleri aşağıdaki sırada fırlatırken header kısmında “event number” eklediniz. Buraya kadar herşey güzel. :)

  1. PackageCreatedEvent (1. event)
  2. LineMarkedAsPlaced (2. event)
  3. PackageDeletedEvent (3. event)
  4. LineMarkedAsInTransit (4. event)

Peki consumer’larınızı 10'a scale ettiniz. Projection Consumer’larınız doğru sıra ile okuyacak mı ? Ne yazık kı %100 garanti veremezsiniz. Aşağıdaki şekilde okunabilir. Yani aşağıdaki gibi okunabilir. :(

  1. LineMarkedAsInTransit (3. event)
  2. PackageCreatedEvent (4. event)
  3. PackageCreatedEvent (1. event)
  4. LineMarkedAsPlaced (2. event)

Peki bu durumda ne olur ? Müşteri kalemi iptal edebilir, fakat bunun farkında olmayan merchant kalemi kargoya verebilir.


Çözüm aslında basit. Eğer eventleri doğru şekilde okursanız bu problemi bertaraf edersiniz. “Ordering of Events in a Distributed System” aslında akademik anlamda NP-Complete (NP) problem. Problem çözümü kısaca şu, sistemize gelen bir event aynı “routing key” ile geliyorsa onu daha önce işlediğin yere at. Rabbitmq’da bu problem için çözüm ; Actor Model’ de , Kafka’da da bulunan algoritma ile aynı “Consistent Hashing Algoritması”. Böylece yukardaki eventleri istediğin kadar consumer ile slace etsen bile doğru şekilde consume edersin. Consistent hashing algoritması ise bize bu routing key’lerin hangi queue’lara atılacağını key value şeklinde bir ring ile tutmaktadır. Algoritma ile daha fazla bilgi vermek isterdim fakat yazı oldukça uzayacak. Bununla ilgili olarak bir ve iki nolu linkler yeterli olacaktır. Algoritmada aynı key’e gelen queue değiştiğinde tüm key value değişmiyor bu sayede cpu ve ram optimum seviyede seyrediyor.

Kısacası “parallelism” ‘in gücünü kullan. “parallelism” ‘i kullanırken ise aynı anahtar değeri ile gelen iş parçacıklarını aynı yere gönder.


Publish implemantasyon:

class Program
{

private static string PackageCreatedEvent = "PackageCreatedEvent";
private static string LineMarkedAsPlaced = "LineMarkedAsPlaced";
private static string PackageDeletedEvent = "PackageDeletedEvent";
private static string LineMarkedAsInTransit = "LineMarkedAsInTransit";

private static String CONSISTENT_HASH_EXCHANGE_TYPE = "x-consistent-hash";

static void Main(string[] args)
{

ConnectionFactory cf = new ConnectionFactory();
cf.UserName = "guest";
cf.Password = "guest";

var conn = cf.CreateConnection();
IModel ch = conn.CreateModel();

ch.ExchangeDeclare(PackageCreatedEvent, CONSISTENT_HASH_EXCHANGE_TYPE, true, false, null);
ch.ExchangeDeclare(LineMarkedAsPlaced, CONSISTENT_HASH_EXCHANGE_TYPE, true, false, null);
ch.ExchangeDeclare(PackageDeletedEvent, CONSISTENT_HASH_EXCHANGE_TYPE, true, false, null);
ch.ExchangeDeclare(LineMarkedAsInTransit, CONSISTENT_HASH_EXCHANGE_TYPE, true, false, null);

ch.ConfirmSelect();

for (int i = 0; (i < 10000); i++)
{
var body = Encoding.UTF8.GetBytes(i.ToString());
ch.BasicPublish(PackageCreatedEvent, i.ToString(), ch.CreateBasicProperties(),body);
ch.BasicPublish(LineMarkedAsPlaced, i.ToString(), ch.CreateBasicProperties(),body);
ch.BasicPublish(PackageDeletedEvent, i.ToString(), ch.CreateBasicProperties(),body);
ch.BasicPublish(LineMarkedAsInTransit, i.ToString(), ch.CreateBasicProperties(),body);
}

}
}

Consumer implemantasyonu:

package main

import (
"encoding/json"
"fmt"
rabbit "github.com/emretiryaki/rabbitmq"
"sync"
"time"
)


func main() {

packageCreatedEvent:= "PackageCreatedEvent"
lineMarkedAsPlaced:= "LineMarkedAsPlaced"
packageDeletedEvent:= "PackageDeletedEvent"
lineMarkedAsInTransit:= "LineMarkedAsInTransit"


var rabbitClient= rabbit.
NewRabbitMqClient([]string{"127.0.0.1"},"guest","guest","",
rabbit.RetryCount(2, time.Duration(0)),
rabbit.PrefetchCount(3))

onConsumed := func(message rabbit.Message) error {

var consumeMessage int
var err= json.Unmarshal(message.Payload, &consumeMessage)
fmt.Println(time.Now().Format("Mon, 02 Jan 2006 15:04:05 "), " Message:", consumeMessage)
return err
}


rabbitClient.AddConsumer("Queue1").
SubscriberExchange("1", rabbit.ConsistentHashing,packageCreatedEvent).
SubscriberExchange("1", rabbit.ConsistentHashing,lineMarkedAsPlaced).
SubscriberExchange("1", rabbit.ConsistentHashing,packageDeletedEvent).
SubscriberExchange("1", rabbit.ConsistentHashing,lineMarkedAsInTransit).
WithSingleGoroutine(true).
HandleConsumer(onConsumed)

rabbitClient.AddConsumer("Queue2").
SubscriberExchange("1", rabbit.ConsistentHashing,packageCreatedEvent).
SubscriberExchange("1", rabbit.ConsistentHashing,lineMarkedAsPlaced).
SubscriberExchange("1", rabbit.ConsistentHashing,packageDeletedEvent).
SubscriberExchange("1", rabbit.ConsistentHashing,lineMarkedAsInTransit).
WithSingleGoroutine(true).
HandleConsumer(onConsumed)

rabbitClient.AddConsumer("Queue3").
SubscriberExchange("1", rabbit.ConsistentHashing,packageCreatedEvent).
SubscriberExchange("1", rabbit.ConsistentHashing,lineMarkedAsPlaced).
SubscriberExchange("1", rabbit.ConsistentHashing,packageDeletedEvent).
SubscriberExchange("1", rabbit.ConsistentHashing,lineMarkedAsInTransit).
WithSingleGoroutine(true).
HandleConsumer(onConsumed)

rabbitClient.AddConsumer("Queue4").
SubscriberExchange("1", rabbit.ConsistentHashing,packageCreatedEvent).
SubscriberExchange("1", rabbit.ConsistentHashing,lineMarkedAsPlaced).
SubscriberExchange("1", rabbit.ConsistentHashing,packageDeletedEvent).
SubscriberExchange("1", rabbit.ConsistentHashing,lineMarkedAsInTransit).
WithSingleGoroutine(true).
HandleConsumer(onConsumed)


rabbitClient.AddConsumer("Queue5").
SubscriberExchange("1", rabbit.ConsistentHashing,packageCreatedEvent).
SubscriberExchange("1", rabbit.ConsistentHashing,lineMarkedAsPlaced).
SubscriberExchange("1", rabbit.ConsistentHashing,packageDeletedEvent).
SubscriberExchange("1", rabbit.ConsistentHashing,lineMarkedAsInTransit).
WithSingleGoroutine(true).
HandleConsumer(onConsumed)


rabbitClient.AddConsumer("Queue6").
SubscriberExchange("1", rabbit.ConsistentHashing,packageCreatedEvent).
SubscriberExchange("1", rabbit.ConsistentHashing,lineMarkedAsPlaced).
SubscriberExchange("1", rabbit.ConsistentHashing,packageDeletedEvent).
SubscriberExchange("1", rabbit.ConsistentHashing,lineMarkedAsInTransit).
WithSingleGoroutine(true).
HandleConsumer(onConsumed)

rabbitClient.AddConsumer("Queue7").
SubscriberExchange("1", rabbit.ConsistentHashing,packageCreatedEvent).
SubscriberExchange("1", rabbit.ConsistentHashing,lineMarkedAsPlaced).
SubscriberExchange("1", rabbit.ConsistentHashing,packageDeletedEvent).
SubscriberExchange("1", rabbit.ConsistentHashing,lineMarkedAsInTransit).
WithSingleGoroutine(true).
HandleConsumer(onConsumed)


rabbitClient.AddConsumer("Queue8").
SubscriberExchange("1", rabbit.ConsistentHashing,packageCreatedEvent).
SubscriberExchange("1", rabbit.ConsistentHashing,lineMarkedAsPlaced).
SubscriberExchange("1", rabbit.ConsistentHashing,packageDeletedEvent).
SubscriberExchange("1", rabbit.ConsistentHashing,lineMarkedAsInTransit).
WithSingleGoroutine(true).
HandleConsumer(onConsumed)
rabbitClient.RunConsumers()


}

Bu algoritma aynı routing key ile fırlatılan mesajların aynı queue’ya düştüğünün garantisini veriyor.


Tam anlaşılmadığının hala farkında gibiyim:) Olaya şöyle bakabiliriz. Consisten Hashing Exchange olmasaydı eventleri doğru sıra ile consume etmek istiyoruz. Nasıl yapmalıyız?

  1. Ya tüm eventleri aynı queue’ya bind eder ve kuyruğu dinleyen uygulamayı consume_scale=1 olarak işaretlemek zorunda kalırız. Ciddi bir yük geldi sisteminize. Eventlerin consume edilmesini bekle:)
  2. 1. çözümün problemini aşmak için elimizdeki routing key desenini biliyorsak “topic” exchange’i ile fiziksel şekilde istediğimiz queue’ya bölmek.
    Örneğin: RoutingKey int değer olsun. Mod(100) 0–20 arası Kuyruk1'e , 21–50 arası Kuyruk2'e 51–99 arası Kuyruk3'e bağladık . Peki ya Kuyruk sayınızı artırmak isterseniz? Routing key ile mesaj bölme algoritmanızı baştan hesaplayacaksınız. Ciddi bir yük geldi ve kuyruk sayınızı 20'ye çıkarmak istediniz. Tekrar hesaplama yapmanız gerekecek production’da. Matematiksel olarak bölün bakalım prodda :)

İşte bu zor çözüm yöntemleri yerine RabbitMq diyorki “kardeşim sen bunlarla uğraşma ben senin için bir exchange yaptım. Gel sen bu exchange’i ve bu exchange’i bağlayacağın kuyrukları da ver. Sonra arkana yaslan ben senin yerine bunları mantıksal olarak böleceğim. İstediğin kadar da scale edeceğim.”


Deneyimlerim sonucu dikkat edilecek hususlar

consistent hashing exchange’i integer çalışıyor. Siz routing key’i string bir ifade olarak atarsanız , Erlang phash2 ile bunu int’e çeviriyor. Test sonuçlarında aynı string ifadeleri aynı integer’a dönüştürdüğünü gözlemledim.

Routing key değerlerini küçük aralıklarda tutmayınız. Eğer routing key aralığını küçük tutup ,buna bağlı kuyruk sayınızı fazla yaparsanız, muhtemelen mesajlar doğru bir şekilde dağıtılmayacaktır. Örnek: routing key aralığınız 0 ile 100 arası ve kuruk sayınız ise 8 . Mesajlar doğru bir şekilde dağıtılmıyor. Aşağıdaki ekran görüntüsü bunu özetliyor.

Consistent Hashing Exchange için, routing key’leri düşük ağırlıklarla bind et. Aşağıdaki resimde görülen “1” ifadesi, topic ve direct exchange’ler gibi sadece “1” olan routing key’leri ilgili kuyruğa gönder demek değil . Bu exchange’deki anlamı ağırlık sayısı demektir. Yani “LineMarketAsInTransit” exchange’ine bağlanan kuyruklara eşit oranda bırak demektir. Eğer herhangi biri 2 olsaydı 2 ile bağlanan kuyruğa diğerlerine oranla 2 kat daha fazla mesaj gidecekti.

SubscriberExchange("1",rabbit.ConsistentHashing,lineMarkedAsPlaced).

Exchange’lere yüksek ağırlık verilmesiyle beraber kuyrukların sık bir şekilde bound/unbound verdiğiniz zaman kuyrukların işlem hacmini düşürebilirsiniz. Örneğin ağırlık numarası 8 kuyruğunuz var ve ağırlık numarasını 1000 verdiniz. Yeni bir kuyruk eklediniz. Yüksek ağırlık verdiğiniz için yeni eklenen kuyruğa işlem gelebilmesi için diğerlerini bekleyeceksiniz demektir.


Özetle Oms takımının cqrs ve event sourcing yolculuğunda rabbitmq message broker’ın bize sunmuş olduğu consistent hashing exchange’ini kullanarak aslında zor olan “hem projection’ları doğru event sırasıyla build etmek , hem de bunu doğru orantıyla scale etmek” problemini nasıl çözdüğümüzü anlatmaya çalıştım.

Okuduğunuz için teşekkür ederim.

hepsiburadatech

hepsiburada technology