MassTransit -Azure Service Bus ile ASP.NET Core 5.0 Mikro Servis Macerası.

Adem Olguner
Devops Türkiye☁️ 🐧 🐳 ☸️
12 min readJan 14, 2022

Günümüzde geliştirilen birçok enterprise uygulama, ihtiyaçlar doğrultusunda birbirinden bağımsız platformlarda, dağıtık (distributed) bir şekilde çalışmaktadır. Bu yapıların birbirleriyle olan iletişimleri genellikle mesajlaşma (messaging) yapıları sayesinde gevşek bağlılıkla(loosely coupled) ve asenkron bir şekilde gerçekleştirilmektedir. Böylece ölçeklenebilirlik (scalability) ve esneklik(flexibility) sağlanmaktadır. Günümüzde kullanılan birçok messaging yapılanmasından hangisini kullanırsanız kullanın; hata yönetimi, yeniden deneme, bekletme, transaction vs. gibi durumlarla karşı karşıya kalınmakta ve bu şekilde distributed olan uygulamaları çalıştırmak ve yönetmek zorlaşmaktadır. İşte bu zorlukların üstesinden gelmek ve yönetilebilirliği kolaylaştırmak için bir Enterprise Service Bus (ESB) teknolojisi kullanmamız gerekmektedir. Bu yazımızda, messaging yapısı olan Azure Service Bus kuyruk sisteminin kullanıldığı distributed sistemlerde, ESB teknolojisi olarak MassTransit Framework’ünü ele alacak, nasıl ve ne amaçla kullanıldığını inceliyor olacağız.

Öncelikle Enterprise Service Bus’un(ESB) ve MassTransit’in ne olduğunu açıklayarak başlayalım.

Enterprise Service Bus (ESB) Nedir?

Servisler arası entegrasyon sağlayan komponentlerin bütünüdür diyebiliriz. Bir sistemde bulunan birden çok birbirinden bağımsız servisin, kendi aralarındaki iletişimini ve etkileşimini sağlayan bir sistemdir. Temelde client API için servisler arası ulaşım/transport işlemlerini daha yüksek abstraction seviyesinde çözmek için kullanılmaktadır. Böylece distributed (dağıtık) olan uygulamaları rahatlıkla çalıştırmayı ve yönetmeyi amaçlamaktadır.

Masstransit

.NET için geliştirilmiş olan, distributed uygulamaları rahatlıkla yönetebilme ve çalıştırmayı amaçlayan, ücretsiz, open source bir Enterprise Service Bus Framework’üdür. Messaging tabanlı, gevşek bağlı(loosely coupled) ve asenkron olarak tasarlanmış dağıtık sistemlerde yüksek dereceli kullanılabilirlik, güvenilirlik ve ölçeklenebilirlik sağlayabilmek için hizmetler oluşturmayı oldukça kolaylaştırmaktadır.

MassTransit, tamamen farklı uygulamalar arasında message-based communication yapabilmemizi sağlayan bir transport gateway görevi görür.

Mesajlaşma nedir?

Mesajlaşma, uygulamalar arası iletişimin mesajlar aracılığı ile gerçekleştirildiği bir yöntemdir. Bu yönteme göre mesajı oluşturan bir yayıncı, mesajın tüketicileri ve bu mesajın iletimine aracılık eden bir kuyruk (mesaj broker) bulunur. Mesaj içeriğinde kaynak, hedef ve meta verilerinden oluşan bir veri yapısıdır.

Neden mesajlaşmaya ihtiyaç duyarız?

Uygulamamızda belirli bir akış içerisinde gerçekleştirilmesi gereken birçok görev olabilir. Bu durum kullanıcıların ekranda çok uzun süreler beklemesine neden olur. Oysaki bazı işler arka planda asenkron olarak yapılabilir. Örneğin sipariş ekranında ödeme işleminden sonra arka planda fatura oluşturduğumuzu ve bilgilendirme mailleri attığımızı düşünelim. Bu işlemlerin aynı anda yapılması kullanıcının gereksiz ve uzun süre beklemesine neden olur. Mesajlaşma sistemi ile her mesaj ilgili modüle gönderilerek işlemler modüllerde asenkron olarak tamamlanabilir.

Bazı dönemlerde sistemler aşırı yük altında çalışabilirler. (Özel günler, kampanya dönemleri vs.) Böyle zamanlarda mesajlaşma sisteminin getirdiği en büyük avantaj yüksek performanstır. Mesajlaşma sistemi ile her mesaj hedef modül tarafından işlenir ve yük anında mesajları işleyen modüllerimiz scale olarak uygulamanın performanslı bir şekilde çalışmasını sağlar.

Azure Service Bus

Azure Service Bus, Azure platformunda çalışan uygulamaların ve hizmetlerin güvenilir bir şekilde birbiriyle haberleşmesi için geliştirilen kurumsal düzeyde bir mesajlaşma aracıdır. Serverless bir yapıdadır. Altyapı maliyetleri, bakımı gibi operasyonel işlerle uğraşmanıza gerek kalmaz. Çeşitli formatlardaki mesajlar farklı uygulamalar arasında iletişim sağlar. Bu mesajlar Json, XML ya da Text olabilirler. Her bir servis ve plan için farklı mesaj büyüklükleri bulunmaktadır. Mesaj servis bus’a gönderildiğinde eğer mesajın alıcısı çevrimiçi değilse mesajımız geçici olarak saklanır. Uygulama çevrimiçi olduğunda mesajı alır ve bu sayede hiçbir mesaj kaybolmaz.

Queues Nedir

Mesajların gönderilip alınması için saklandığı kuyruk yapılarıdır. İlk giren ilk çıkar mantığına göre çalışır. (FIFO)

Bir kuyruk için birden fazla gönderici olabilir ama mesaj sadece bir alıcıya iletilir. Uygulama mesajı alana kadar mesaj kuyruk üzerinde saklanır. Kuyruktan bir mesaj alındığında Service Bus mesaja bir zaman damgası ataması yapar. Kuyruktaki mesajlar sadece uygulama talep ettiğinde teslim edilir. Queue mesajları genellikle noktadan noktaya iletişim için kullanılır.

Queue’nun bir diğer özelliği ise yinelenen kayıtların algılanmasıdır. (duplicate detection) Eğer kuyrukta yenilenen mesajları eklemek istemiyor isek bu özelliği aktifleştirebiliriz. Kısacası kuyrukta benzersiz mesajların oluşması için kullanılan bir özelliktir.

Service Bus için Queue mesajları okunduktan sonra silinebilir fakat alıcı mesajı alırken bir problem oluşursa mesaj kaybolabilir. Eğer Peek Lock seçeneğini seçersek mesaj alıcı tarafından kilitli hale gelir ve diğer alıcılara görünmez. Mesaj başarılı şekilde işlenirse tamamlandı “Completed” olarak işaretlenir ve mesaj kuyruktan silinir. Mesaj işlenirken bir problem olursa bu kez Service Bus’a mesajı işlemediğimizi söyleyerek mesajın yeniden kullanılabilir hale gelmesini sağlayabilirsiniz. Kilitleme Süresi sona erdiğinde ise diğer alıcılar bu mesajı alabilecektir.

Topic Nedir?

Topicler sayesinde birden fazla alıcıya aynı mesajı iletebiliriz. Her alıcı için bir filtre ekleyebiliriz. Bu filtre, yalnızca filtre kuralına uyan mesajların söz konusu aboneliğe erişmesine izin verecektir. Bu şekilde, her abonelik belirli kurallara göre mesajları dinleyebilir.

Projelerde Queue mu Topic mi kullanmalı?

Geliştirdiğiniz uygulamalarda eğer bire bir modelde iletişim kuracaksak ve mesaj tek bir alıcıya ulaşması gerekiyorsa Queue tercih edilebilir.

Örneğin bir Yemek sipariş uygulaması geliştirdiğimizi düşünelim. Siparişlerin sıra ile işlenmesi gerekmektedir. Siparişleri bir kuyruğa atıp (Queue) sırası ile işleyebilir.

Birden çok sisteme aynı mesajı kullanmak isterseniz Topic kullanmanız daha doğru olacaktır.

Masstransit ile ilgili olarak bazı konulara proje içerisinde kodlarımızı yazarken açıklamaya devam edeceğiz.

We are starting … 🧠

Örnek bir uygulama üzerinden generic bir altyapı kurmaya ve bu altyapı ile geliştirmelerimizi yapacağız. Örnek bir akış olarak gerçek hayata yakın bir örnek üzerinden devam edeceğiz.

Uygulamamızı bir API ile tasarlayacağız, swagger ile crud işlemini yapacağız gönderilen mesajı consume edeceğiz, asenkron olarak işlemlerimizin nasıl yapıldığını göreceğiz. Burada tasarlayacağımız altyapı ile farklı topic,queue ve filter işlemleri ile ihtiyacımıza göre ek iş kurallarını tanımlayabilir olacağız.

Project ve Uygulama Katmanları

* Step 1: Message Provider tanımlama

Burada altyapı olarak mesajlaşma işlemini Azure Service Bus ve MassTransit için 2 farklı provider olarak tasarlayacağız, appSettings üzerinden kontrol edip istediğimiz anda provider değiştirerek uygulamamızın devamlılığını sağlıyor olacağız. Bunun için Message Broker Provider tanımlama ile başlayabiliriz.

* Step 2: IMessage<T> tipinde bir mesaj tipi belirliyoruz

Message Broker Provider Send metodu bizden IMessage<T> tipinde generic bir tip istiyor olacak.

Bu mesaj tipleri Topic yada Queue olacak. IMessage<T> generic type ile IQueueMessage ve ITopicMessage tiplerinde 2 ayrı mesaj tipi oluşturalım. Bu mesaj tipleri IMessage interface’i miras alsın.

Şimdi oluşturduğumuz 2 ayrı mesaj tipini de kullanabileceğimiz bir IGenericMessage tipinde diğer type’ı oluşturalım.

Mesaj türlerimiz Topic ve Queue olarak tanımladık, şimdi bu tipleri bir enum olarak tanımlayalım.

Projenin başında bahsettiğim gibi Service Provider 2 tipte olacak; ServiceBus ve Masstransit. Bu tiplerimizi de enum olarak tanımlayalım.

*Step 3: IMessageBrokerProvider için Send metodu içeriğini yazalım.

Hem Azure için hemde Masstransit için send metodunda farklı kod blokları bulunmaktadır, her teknoloji kendine ait bir altyapı istemektedir, gerekli entegrasyonu oluşturalım.

1- Azure Service Bus Provider:

2- Masstransit Provider :

*Step 4 : Topic ve Queue tanımlama gereksinimleri

Bir topic veya queue oluşturmak için hangi tanımlamalar yapılması gerekiyor? sorusu ile devam edelim.

Queue : Mesaj Tipi, Mesaj İsim
Topic. : Mesaj Tipi, Mesaj İsim, Abonelik Adı,FilterQuery(isteğe bağlı)
tanımlamaları yeterlidir.

Bu çerçevede bakıldığında 2 mesaj tipi içinde ortak bir tanımlama altyapısı oluşturabiliriz.

Bu işlem için mesajlaşma objelerini diğer objelerden ayırt etmek ve kontrol edebilmek için Attribute kullanarak MessageNameAttribute isminde bir attribute tanımlayalım.

MassTransit.Topology => IEntityNameFormatter

MassTransit de Topology, kuyrukları yapılandırmak için mesaj türlerinin nasıl kullanıldığını gösterir. Topology, doğrudan değiş tokuşları ve yönlendirme anahtarları gibi belirli aracı özelliklerine erişmek için de kullanılır.

Topoloji, MassTransit içindeki ara katman yazılımlarına daha fazla odaklanan gönderme, yayınlama ve tüketme işlem hatlarından ayrıdır.

Topoloji, mesajlar yayınlanır ve gönderilirken çalışma zamanında mesaja özel topoloji yapılandırması oluşturabilen kural setlerinin oluşturulmasına izin verir.

Subscription

Subscription abonelik anlamına gelmektedir. Nedir abonelik? Bir mesaj yayınladınız, bu mesajı dinleyen veya dinleyenler birer abonedir yani subscription oluyor. Subscription sadece Topic türündeki eventleri dinlemek için kullanılabilir. Queue tipindeki eventler 1'den fazla aboneliği olmadığı için sadece Topic türündeki eventler için tanımlayabiliriz.

Filtreler

Service Bus üç filtre koşulu destekler:

  • SQL filtreler :SQL benzeri koşullu ifade barındırır. Koşullu ifadede tüm sistem özelliklerinin ön eki olmalıdır sys. . filtre koşullarına yönelik SQL dili alt kümesi , özelliklerin ( EXISTS ), null-değerlerin ( IS NULL ), mantıksal olmayan/ve/veya, ilişkisel işleçlerin, basit sayısal aritmetiğinin ve ile eşleşen basit metin deseninin varlığını sınar LIKE .
  • Boole filtreleri : true filter ve false filter tüm gelen iletilerin (true) ya da abonelik için seçili olmayan iletilerin (false) hiçbirinin seçilmemesine neden olur. Bu iki filtre SQL filtresinden türetilir.
  • Bağıntı filtreleri : Bir correlation filter , gelen bir veya daha fazla iletinin Kullanıcı ve sistem özelliklerine göre eşleşen bir koşullar kümesi tutar. Yaygın olarak kullanılan bir kullanım, CorrelationId özelliğiyle eşleşmedir, ancak uygulama aşağıdaki özelliklerle eşleşmeyi de seçebilir:

ContentType
Label
MessageId
ReplyTo
ReplyToSessionId
SessionId
To
Kullanıcı tanımlı tüm özellikler.

sys.label LIKE ‘%bus%’`
sys.messageid = ‘xxxx’
sys.correlationid like ‘abc-%’

Kaynak ve daha fazlası için buradan erişebilirsiniz.

Bu filtre parametreleri ile gönderilen Event için filtrelemeler yapılabilir. Ancak biz burada gönderdiğimiz model datasının içinde bir veya birden fazla parametrenin değerine göre bir filtreleme için altyapısı oluşturacağız.

Filtreleme için bir attribute oluşturalım.

Bu attribute ile filtrelenmesini istediğimiz özellikleri (property) işaretleyebiliyor olacağız.

Dinamik olarak model propertilerinden filter propertylerini seçmiş olacağız. Model içindeki propertylerden işaretlediğimiz attribute (MessageConsumerFilterable) ile işaretli olan property ve değerini alıyoruz.

Filter ve MessageConsumerFilterable attribute kullanımı ile ilgili detayları modelleri ve alt modelleri oluştururken tekrar değineceğiz.

Şimdi kurgulamış ve tasarlamaya çalıştığımız altyapıyı register işlemlerini yapalım.

*Step 5: Service Bus Provider seçimine göre Injection işlemlerimizi tanımlayalım.

Burada register ettiğimiz Mesaj tipine göre gerekli konfigürasyon işlemlerini yapıyoruz.
Queue: Receiver işlemi
Topic: Subscription işlemi tanımlanıyor.
Farklı olarak Topic tipinde filter işlemleri yapılmaktadır.

MassTransit.Topology

IMessageEntityNameFormatter<T> uygulayarak ve bunu yapılandırma sırasında belirterek mesaja özel bir varlık adı biçimlendiricisi oluşturmak da mümkündür.

Exception — Retry

Immediate” policy ile herhangi bir exception anında ilgili message error queue‘ya taşımadan önce, X kere daha denemesi gerektiği gibi birden fazla retry policyleri bulunmaktadır. Bu policy ile ne kadar retry işleminin tekrar yapılacağını, her denemenin ardından ne kadar süre bekleneceği ve her bekleme süresinin de incremental olarak ne kadar artması gerektiği gibi bilgileri de set edebilmekteyiz.

Kaynak

Bir diğer yandan retry işlemleri sırasında meydana gelen hataları (exception) retry filter‘larını kullanarak handle ve ignore edebilmek de mümkündür.

Azure Service Bus Configuration

Kullanılacak olan Topic, Queue, Subscription gibi tanımlamaları Azure Portal kullanarak yapabileceğimiz gibi, bu tanımlamaları kod üzerinden de oluşturabiliriz.

Queue Receiver Configuration

Circuit Breaker : Bir arıza durumunda kaynakları (uzak, yerel veya başka türlü) aşırı yüklenmekten korumak için bir devre kesici kullanılır. Örneğin, uzak bir web sitesi kullanılamayabilir ve bu web sitesini bir mesajla aramanın zaman aşımına uğraması 30–60 saniye sürer. Başarısız olan hizmeti aramaya devam ederek, hizmeti kurtaramayabiliriz. Bir devre kesici, tekrarlanan arızaları ve açmaları algılayarak servise daha fazla çağrı yapılmasını önler ve servise toparlanması için zaman tanır. Sıfırlama aralığı sona erdiğinde, aramaların yavaş yavaş hizmete geri dönmesine izin verilir. Hala başarısız oluyor ise, kesici açık kalır ve zaman aşımı aralığı sıfırlanır. Servis sağlıklı duruma döndüğünde, kesici kapanırken çağrılar normal şekilde akar.

UseCircuitBreaker

TripThreshold: Bu bir yüzdedir ve başarılı denemelerin başarısız denemelere oranını temel alır. 15'e ayarlandığında, oran %15'i aşarsa devre kesici açılır ve ResetInterval süresi dolana kadar açık kalır.

ActiveThreshold: Bu, devre kesicinin devreye girmesinden önce bir izleme periyodunda devre kesiciye ulaşması gereken mesaj sayısıdır. 10'a ayarlanırsa, açma eşiği en az 10 mesaj alınana kadar değerlendirilmez.

ResetInterval : Devre kesicinin devreye girmesi ile devre kesiciyi ilk kapatma girişimi arasındaki süre. Açık periyot sırasında devre kesiciye ulaşan mesajlar, devre kesiciyi açan aynı istisna dışında hemen başarısız olacaktır.

UseRateLimiter

Bir süre içinde işlenen iletilerin sayısını sınırlamak için bir hız sınırlayıcı kullanılır. Bunun nedeni, bir API veya hizmetin dakika başına yalnızca belirli sayıda çağrıyı kabul etmesi ve sonraki denemeleri hız sınırlama süresi dolana kadar geciktirmesi olabilir.

Startup Konfigürasyonu

Startup.cs içerisinde ConfigureServices metoduna ekliyoruz.

Gelelim Projeye

Bir adet makale post edeceğiz. 2 adet consumer oluşturacağız. Bu 2 consumer da gönderilen mesaj modelini dinliyor olacak. Ancak consumer nesnelerinden bir tanesi gelen her mesajı consume ederken, filtre eklediğimiz diğer consumer gelen message dinleyip filtre koşuluna uyan mesajları consume edecek.

1- Model Tanımları

AcreaetArticleEventValue isminde modelimizi tanımlıyoruz. [MessageConsumerFilterable] attribute ile hangi property filtre parametresi olacağını belirtmiş oluyoruz. Burada örnek olması açısından 1 tane IsIndex property için ekledim 1 veya birden çok property için de ekleme yapabiliriz.

Article Controller içerisinde Post işlemi gerçekleştireceğiz. Oluşturulan datayı MediatR ile send işlemi yapacağız.

CreateArticleCommandInputHandler içerisinde request ile gönderilen datayı mapping işlemi ile veritabanı nesnesi modeline çeviriyoruz. Generic Repository Pattern ile oluşturduğumuz, AddItem() metodunu çağırarak veritabanına kayıt işlemini yapıyoruz.

Ardından service bus tarafına gönderilecek olam CreateArticleEventValue modelini oluşturuyoruz. Inject edilmiş provider ile mesajı generic tipte gönderiyoruz.

Consumers

Gönderilen mesajları dinlemek için gerekli consumer nesnelerini oluşturmaya başlayalım. IConsumer<CreateArticleEventValue> ile crud işlemi sırasında gönderilen obje tipinde bir message datasını beklediğimizi belirtiyoruz.

X : Tasarım gereği context.message.values ile (CreateArticleEventValue) collection tipinde belirledik. Data 1 item da olsa, 1 den çok item da olsa bunu bir Collection olarak ayarlama yapıldı, her türlü veri setini handle edebilmek için bu şekilde tasarladık.

Kurgulamaya çalıştığımız altyapı ile günlük döviz bilgilerinin çekilip, her döviz tipinde bir item oluşturarak birden çok datayı da mesaj olarak gönderebiliriz. Bu şekilde alt yapıyı bozmadan ihtiyaçlarımıza göre esneklik sağlamış olucaktır.

Consume edilen datayı, mail gönderim işlemi yapmadan önce gerekli konfigürasyonları yapmak için farklı bir handler (ArticleMailSendBeforeDeliveryInput) çağırıyoruz.

ArticleMailSendBeforeDeliveryHandler ile mail modeli oluşturup gerekli olan property alanlarına dataları setliyoruz.

Neden mediator.Publish kullanıyoruz? neden mediator.Send ile yapmıyoruz?

Bir handler içerisinden farklı bir handler direkt çağırmak best practice olarak tavsiye edilen bir durum değil, mediator NotificationHandler altyapısını kullanarak gerekli işlemi yapalım.

Bu şekilde domain modellerini NotificationHandler ile consume etmiş oluyoruz.

Event Modeller ve NotificationHandler<T>

Notify edilecek event modeli (SendingMailCreatedEvent), INotification olarak imzalıyoruz.

Mail gönderim işlemini yapacak handler (SendingMailCreatedEventHandler) oluşturalım burada gerekli Mail gönderim kod bloklarını yazabiliriz.

Filter Consumer

Daha önce oluşturduğumuz dinamik MessageConsumer altyapısı sayesinde burada consume edilecek message içerisinde filtre ile gelen message içerisinden koşulları karşılayan mesajı dinleyebiliyoruz artık.

Yapılan ilk Crud işlemi ile gelen CreateArticleEventValue event modelindeki context içerisinde IsIndex == evet şeklinde bir filtre oluşturup gelen her CreateArticleEventValue event modelini consume etmeyeceğiz, sadece bizim yazdığımız filtre koşuluna uyan modeli consume edeceğiz.

Neden subscription2 ?

Aynı uygulama (endpoint) içerisinde aynı mesajı consume eden 2 adet consumer için izin verilmiyor.Yani 1 proje içerisinde 1 mesaj (topic) 1 adet subscription olmalı. Farklı bir uygulama katmanı açmadan 2.ci bir subscription tanımlayarak devam ediyoruz.

Bu satırları okuyorsanız, göstermiş olduğun sabır için teşekkür ederim :)

Farklı bir zamanda , farklı bir konu ile tekrar görüşmek üzere.

Uygulama kodlarına github adresimden ulaşabilirsiniz. Sorularınız için bana linkedin adresim üzerinden veya buraya comment olarak yazabilirsiniz.

kaynak

--

--