RabbitMQ Custom Delayed Message Yöntemi

Mert Haskan
Trendyol Tech
Published in
6 min readMay 11, 2020

--

Trendyol Tech ekibi olarak yoğun bir şekilde RabbitMQ kullanmaktayız. RabbitMQ servislerimiz arasındaki haberleşmede büyük bir öneme sahip. Delivery domaininde de her gün milyonlarca yeni mesaj RabbitMQ’ya publish edilip consume edilebiliyor. Günlük yüke göre RabbitMQ’da bulunan mesaj sayısı 4M+ mesaja ulaşabiliyor. Yüksek mesaj trafiğimizden dolayı RabbitMQ kaynaklarını iyi kullanmamız ve yönetmemiz gerekiyor.

Delivery domaini sorumluluklarından biri de alınan siparişlerin kargo firmalarına geçilmesi ve bu siparişlerin kargodaki durumlarının takip edilmesidir. Bir paketin kargo firması tarafında oluşturulmasından son duruma geçmesi günler sürebilir. Bu süreci RabbitMQ üzerinde paket bilgilerini schedule mesaj ile düzenli consume ederek yönetiyoruz. Bu yazıda schedule mesaj için eskiden kullandığımız delayed message plugininden ve bu plugine alternatif olarak geliştirdiğimiz yöntemimizi anlatacağım.

Delayed Message Plugin Nedir?

RabbitMQ’da mesajların gecikmeli olarak kuyruğa yönlendiren bir plugindir. Plugin gecikmeli olarak gönderilecek mesajları exchange üzerinden yönlendirmektedir. Kullanmak için x-delayed-message tipinde bir exchange tanımlanır. Exchange e gönderilen mesajlara x-delay header verilerek mesajın exchange üzerinde belirtilen süre boyunca kalması sağlanır.

Delayed Message Plugin Avantaj ve Dezavantajları

RabbitMQ üzerinde schedule mesaj oluşturmayı sağlıyor. RabbitMQ tarafında kurulum yapılıyor. Uygulamalarda kolay kullanım sunuyor.

Exchange üzerindeki mesajları ve sayılarını görmeye imkan sağlamıyor. RabbitMQ üzerindeki mesaj yükünü görmeyi zorlaştırıyor. Exchange üzerinde tutulan mesajlar RAM’de tutuluyor. Aktif olarak consume edilmeyecek bu mesajlar RAM kaynaklarını tükettiği için consume edilecek mesajların kaynaklarını kısıtlıyor.

Delivery Delayed Message Yöntemi

Delivery delayed message yöntemini anlatmadan önce kullandığımız RabbitMQ özelliklerinden bahsedelim.

  1. Time-To-Live and Expiration (TTL)
  2. Dead Letter Exchange (DLX)
  3. Lazy Queue

1.Time-To-Live and Expiration

RabbitMQ, TTL özelliği ile kuyruk üzerindeki mesajların belirtilen süre sonrasında silinmesini sağlıyor. Bu özellik iki şekilde tanımlanabiliyor; kuyruk bazlı ve mesaj bazlı. Biz delayed message yapımızda mesaj bazlı olarak bu özelliği kullanacağız. Mesaj bazlı TTL yöntemini daha esnek ve yönetilmesi kolay olduğu için tercih ettik. Kuyruk bazlı kullanılmasında da sorun yok.

2.Dead Letter Exchange (DLX)

Kuyruk üzerine tanımlanan dead letter exchange ve dead letter routing key özellikleri kuyruktan silinen mesajların başka kuyruğa yönlendirilmesini sağlıyor. Bu özellik kuyruk yaratılırken ya da policy ile tanımlanabilir. Bu özelliği de tanımladığımızda artık TTL ile gelen mesaj süresi dolduktan sonra başka kuyruğa bırakılabilecek hale gelmiş oluyor.

3.Lazy Queue

Kuyruğa verilen lazy queue tanımı ile kuyruk üzerindeki mesajların RAM yerine diskte tutulabileceğini RabbitMQ’ya tanımlamış oluyoruz. Lazy queue özelliği aktif consumer bulunmayan ya da hızlı consume edilmeyecek kuyruklar için ideal bir özellik. RabbitMQ’nun RAM kaynağını ihtiyaç duyan kuyruklar için bırakmış oluyoruz.

Buraya kadar kullanacağımız özelliklerden bahsettik. Bunları birleştirerek delayed message özelliğini sağlayacak yönteme geçebiliriz. Öncelikle aktif consume edilen kuyruğumuzdan farklı bir yerde delayed message’ların tutulmasını istiyoruz. Yeni bir kuyruk oluşturup bu kuyruğa DLX olarak aktif kuyruğumuzu tanımlıyoruz. Ayrıca kaynaklarımızı daha iyi kullanmak için bu kuyruğu lazy queue olarak tanımlayalım. Böylece kuyrukta uzun süre bekleyecek mesajlar RAM yerine diskte tutularak RabbitMQ üzerinde bize daha az yük olacaktır. Kuyruğumuzu bu şekilde tanımladıktan sonra geriye sadece delayed message’ların bu kuyruğa iletilmesi ve sürelerinin tanımlanması kaldı. Burda consumer tarafından mesaj bazlı TTL ile consumerın karar vermesini sağlayacağız. Bu durumda aşağıdaki şekilde bir RabbitMQ kuyruk yapımız oluşacak.

Delayed Message Queue Structure

Sonuç

Geliştirdiğimiz bu yöntem ile plugine göre %70 oranında RabbitMQ RAM kullanımı azaldı. RabbitMQ üzerinde yük azaldığı için kampanya dönemlerindeki yükselen mesaj sayılarında bile sistem ek kaynağa ihtiyaç duymadan çalışabiliyor. Delay edilen mesaj sayılarını görebildiğimiz için mesajların çoklanması durumlarını kolayca fark edebiliyor ve yönetebiliyoruz.

Aşağıdaki görsellerde yüksek yük altındaki RabbitMQ kaynak kullanımını görmektesiniz.

Message Count
RabbitMQ Ram Usage

Geliştirdiğimiz bu yöntemi bir demo proje üzerinde uygulayalım. Yukarıda anlatılanlara ek olarak demo projemizde MessageRecoverer özelliğini de kullandık. Bu özellikten daha detaylı olarak sonraki yazılarımızda bahsedeceğiz. Demo projede kullandığımızdan dolayı kısaca bahsetmek gerekirse, MessageRecoverer özelliği ile custom exceptionların yönetilmesi kolaylaştı.

Demo Proje

Spring Boot ile örnek bir consumer oluşturacağız. Bu consumer projemizde yukarıda anlatmış olduğum yapıyı ve delayed message’ların publish yapısını oluşturacağız. Consumer projemizde RabbitMQ ile bağlantımızı Spring AMQP ile yapacağız. Projenin application.yaml’ından başlayalım.

Event altında kuyruk adı, exchange adı ve delay mesajları için oluşturulacak kuyrukların adlarını tanımlıyoruz. Recover altında delay mesaj kuyruğunun postfix tanımını ve delay edilecek mesajların milisaniye olarak ertelenme süresini tanımlıyoruz. Retry-Policy altında RabbitMQ kütüphanesinin hatalı mesajı tekrar denememesi için gerekli ayarları tanımlıyoruz. Buradaki değerleri kullanarak RabbitMQ tarafında gerekli kuyruk yaratma ve tanımlama işlemlerini yapacağız. Bu tanımlamaları yaptığımız RabbitConfiguration classını inceleyelim.

RabbitConnectionFactory methodunda application.yaml’da belirtilen RabbitMQ host, kullanıcı adı ve şifre bilgilerini kullanarak bağlantı oluşturuyoruz. Bu bağlantıyı Bean olarak tanımladığımız için gerektiği yerlerde Dependency Injection yardımıyla kullanabileceğiz. Sıradaki method Rabbit bağlantısını kullanarak RabbitAdmin oluşturuyor. RabbitAdmin yönetici operasyonları gerçekleştirmemizi sağlıyor. Kuyruk, exchange ve binding tanımlama gibi işlemleri yapabiliyoruz. CreateConsumeQueue methodunda RabbitAdmin kullanarak application.yaml’da belirttiğimiz isimde bir kuyruk oluşturulmasını sağlıyoruz. Bu kuyruk projemizin consume edeceği kuyruk. Bu metodu benzer bir şekilde DelayedMessageQueue metodunda delay edilen mesajlar için belirlediğimiz kuyruğun yaratılmasını sağlıyoruz. Burada consume kuyruğundan farklı olarak DLX ve lazy queue özelliklerini tanımlayarak daha önce bahsettiğimiz kuyruk yapısını oluşturuyoruz.

Delayed Message Queue Structure

Exchange ve binding tanımlama işlemleri de CreateConsumeExchange ve CreateQueueExchangeBinding metotlarında yapılıyor. Bunlardan sonra gelen MQRetryInterceptor metodunda RetryOperationsInterceptor oluşturuyoruz. RetryOperationsInterceptor Spring AMQP kütüphanesinin mesaj consume edilirken bir sorun oluşursa (exception) tekrar deneme ve recover işlemlerini tanımlayan sınıftır. Burada tanımlamayı yaparken application.yaml’da verilen değerleri kullanıyoruz. Benim örneğimde 1 olarak tanımlanmıştı yani hata olursa tekrar denemeyecek. Bizim için önemli olan recoverer tanımı, burada kendi oluşturduğumuz MessageRecoverer classını veriyoruz. Bu sınıfın detayını daha sonra inceleyeceğiz.

Buraya kadar yaptıklarımızı özetlersek application.yaml’da kuyruk, exchange ve delay mesaj kuyruk ismi gibi tanımlamaları yaptık. Bu tanımlamaları kullanarak RabbitConfiguration classında RabbitMQ bağlantısını, kuyruk, exchange ve binding tanımlarını yaptık.

Demo uygulamamızda gelen mesajların id’lerine göre 0 ile 100 arasındakileri delay edeceğimiz bir örnek oluşturalım. Bunun için bir event classı tanımlayalım.

RabbitMQ consume metodumuzu oluşturalım. RabbitListener yardımıyla publish edilen eventi dinleyecek metodumuzu oluşturuyoruz. Consume ettiği mesajları ConsumeService classına iletecek.

Mesajları consume edecek servisimizi oluşturalım. Servisimiz gelen mesajın id’sine göre mesajı delay edecek ya da mesajları console’a yazacak. Delay edilecek mesajlar için DelayException fırlatalım.

DelayException için MessageRecoverer oluşturacağız. Böylece delay edilecek mesajları kolaylıkla business logic’ten ayırabileceğiz ve tek bir yerden yönetilmesini sağlayacağız. Bunun için bir MessageRecoverer oluşturalım, CustomMessageRecover oluşturarak consume sırasında herhangi bir exception olursa burdan yönetilmesini sağlayacağız.

Burada Strategy Pattern uygulayarak genişleyebilir bir yapı oluşturdum. RabbitStrategies listesi RabbitStrategy interface’ini implement eden sınıfları tutuyor. Bu liste üzerinde döngü ile gezerek hataya düşmüş olan mesaj ve exception için uygun bir yöntem bularak işlem yapıyoruz. Process metodu strategy sınıfının uygun olup olmadığını yönetiyor. Uygun olan yöntemin Recover metodunu çağırıyoruz. Mesaj için yapılacak işlem tamamlandığı için break ile döngüyü sonlandırıyoruz.

Delay Message için RabbitStrategy interface i implement eden bir strategy classı oluşturalım:

Process metodunda exception DelayException ise true döneceğiz. Recover metodunda mesajın headerına exception’da verilen mesaj ve hataya düştüğü tarihi ekledim. RabbitMQ management arayüzünden mesaja bakıldığında hata mesajı ve zamanın görülmesini sağlıyoruz. Sonrasında mesajı delay mesajlar için tanımlanan kuyruğa gönderelim. Bunun için mesajın bulunduğu kuyruğun adının üzerine application.yaml’da belirtilen postfix’i ekliyoruz. Bu mesajın application.yaml’da tanımlanan süre kadar bekleyip tekrar diğer kuyruğa dönmesini istiyoruz bu nedenle mesaja expiration süresi tanımlıyoruz. Mesajı AmqpTemplate ile send ederek işlemi tamamlıyoruz.

Buraya kadar yaptıklarımızı özetleyecek olursak:

RabbitMQ üzerinden consume kuyruğuna bırakılan bir eventi consume edip servisimize gönderdik. Servisimiz mesajın id’sine bakarak mesajı delay edecek. Delay etmek için DelayException fırlatarak mesajın hataya düşmesini sağladık. Burada MessageRecoverer olarak oluşturduğumuz classımız araya girerek DelayedMessageRabbitStrategy tarafından mesajın delay kuyruğuna gönderilmesini sağladı. Mesaj delay kuyruğuna TTL ile bırakıldığı için TTL süresi dolunca mesajın tekrar consume kuyruğuna düşmesini sağladık.

Projede consume edilen bir mesajın işlenmesini aşağıdaki şekildedir:

Sample Consumer

--

--