RabbitMQ Teknolojisi

Ahmet Selçuk Özdemir
14 min readAug 4, 2023

Bu yazımda detaylıca RabbitMQ teknolojisini örnekleriyle beraber ele alacağım.Öncelikle RabbitMQ teknolojisini teorik olarak ifade edelim daha sonra kavramları ve mimariyi kod örnekleriyle beraber açıklamaya çalışalım.

RabbitMQ, açık kaynaklı bir mesaj sırası (message queue) yazılımıdır.Birbirinden bağımsız sistemler arasında veri alışverişi yapmak için kullanılır. Mesaj sırası, yazılım sistemleri ve uygulamalar arasında asenkron mesajlaşma sağlayan bir araçtır. Mesaj sırası, mesajları gönderen uygulamaların ve mesajları alan uygulamaların birbirine bağlı olmasını gerektirmez, böylece uygulamalar arasında gevşek bir bağlantı kurulmasına olanak tanır.

Özetlemek gerekirse RabbitMQ, dağıtık ve ölçeklenebilir sistemlerde mesajlaşmayı kolaylaştırmak için kullanılır. Uygulamalar arasında verilerin iletilmesi ve bilginin paylaşılması amacıyla kullanılan mesajlar, RabbitMQ kuyruklarında depolanır. Mesajları gönderen uygulamalar, mesajları ilgili kuyruklara gönderir, ve mesajları alan uygulamalar bu kuyruklardan mesajları alır ve işler.

Şimdi biraz bu teknolojideki kavramlardan ve amaçlarından bahsedelim.

1)Publisher; Kuyruğa mesajları gönderene ve yerleştirene denir. Producer(yayıncı) olarak da isimlendirilebilir.

2)Consumer; Kuyruktaki mesajları alan ve işleyen uygulamayı ifade eder. Yani, mesajları tüketen ve kuyruktan çeken tarafı temsil eder.

3)Message Queue ; Publisher ve Consumer arasındaki iletişim için kullanılan veri birimidir. Yani publisher’in consumer tarafından işlenmesini istediği verinin kendisidir.Bir e-ticaret sisteminden örnek verirsek eğer siparişe ait mesaj olarak; siparişin numarası, müşteri bilgileri, ürün bilgisi veya ödeme bilgileri örnek verilebilir.

Message queue içerisindeki mesajlar Consumer tarafından sırasıyla işlenir.

Peki Message Queue’in Amacı Nedir ?

Bazı senaryolarda birbirinden farklı sistemler arasında işlevsel açıdan senkron haberleşmek kullanıcı deneyimi açısından pek uygun olmayabiliyor. Mesela bir e-ticaret uygulamasında ödeme işlemi neticesinde fatura oluşturabilmek için ilgili servisin işlemini senkron bir şekilde beklemek ve bunu son kullanıcıya yansıtmak hiçte mantıklı bir davranış değildir. Bu tarz senaryolarda sistemler arasında senkrondan ziyade asekron bir iletişim modeli kullanılmalıdır. Ödeme neticesinde kullanıcıya siparişin başarıyla gerçekleştirildiğine dair sonuç(e-mail veya sms vb.) döndülürken bir yandan da message queue’ya fatura ile ilgili bir mesaj gönderilmelidir.

Bu mesaj fatura servisi tarafından ilk fırsatta alınarak işlenir/tüketilir ve son kullanıcı fatura oluşturma sürecini beklemeksizin ilgili siparişe dair fatura asekron bir şekilde üretilir. Böylece süreçte sistemler arasındaki iletişim daha verimli hale gelmiş olur ve işlemler arasında bekleme/gecikme durumu ortadan kalkmış olur.

4)Message Broker; RabbitMQ’nun merkezi bileşenidir. Mesajların yönlendirilmesi, kuyruk yönetimi, yayınlama/abonelik işlemleri ve diğer tüm işlevler broker tarafından yönetilir.

5)Exchange ; Publisher tarafından gönderilen mesajların nasıl yönetileceğini ve hangi route’lara yönledirileceğini belirlememiz konusunda kontrol sağlayan/karar veren yapıdır.

Route;Mesajların exchange üzerinden kuyruklara nasıl gönderileceğini tanımlayan mekanizmadır.Genel olarak mesajların yolunu ifade eder.

Exchange üzerinde birden fazla queue’ye bind olabilir. Bu durumlarda ise kuyruklardan hangisine mesaj göndereceğini anlaması exchange türüne göre değişiklik gösterebilir. Bunlardan kısaca bahsedelim.

a) Direct Exchange; Mesajların direk olarak belirli bir kuyruğa gönderilmesini sağlayan exchange’dir.

Bir direct exchange örneği.

b) Fanout Exchange; Mesajların bu exchange’e bind olmuş tüm kuyruklara gönderilmesini sağlar. Publisher mesajlarının gönderildiği kuyruk isimleri dikkate almaz ve mesajları tüm kuyruklara gönderir.

Bir fanout exchange örneği.

c) Topic; Routing keyleri kullanarak mesajları kuyruklara yönlendirmek için kullanılan bir exchange’dir.

Bir topic exchange örneği.

d) Header Exchange; Routing key yerine headerleri kullanarak mesajları kuyruklara yönlendirmek için kullanılan exchange’dir.

Bir header exchange örneği.

6)Channel ; Bir bağlantı içinde birden fazla işlem yapmayı sağlayan sanal bir iletişim kanalıdır. Her kanal, kendi Exchange ve Queue bağlantılarına sahip olabilir.

Burada kavramları açıklamaya biraz ara vermek istiyorum, kod örneklerine başlamadan önce RabbitMQ kurulumunu da göstermek istiyorum. Docker üzerinde veya CloudAMQP üzerinden ayağa kaldırabiliriz.İkisini de göstereceğim.

İlk olarak docker üzerinde RabbitMQ’yi ayağa kaldıralım.CMD veya powershell üzerinde aşağıda paylaştığım komutu çalıştıralım .

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.12-manag
Docker uygulaması üzerinden kontrol ettiğimizde başarılı bir şekilde RabbitMQ containerinin ayağa kalktığını görebiliyoruz.
Portumuz ile ilgili url’e gittiğimizde de rabbitmq servisimiz ile ilgili detaylı bilgi alabiliyoruz.

Şimdi ise CloudAMQP üzerinden nasıl ayağa kaldırabileceğimizden bahsedeceğim.

Öncelikle https://www.cloudamqp.com/ sitesine gidiyoruz ve üyelik işlemlerimizi gerçekleştiriyoruz. Daha sonra new instances diyerek sırasıyla ilgili alanları doldurup bir instances yaratıyoruz.Başarılı bir şekilde instances yarattıktan sonra RabbitMQ ile ilgili birçok detaylı connection bilgisi gibi işimize yarayacak bilgilere buradan artık erişebiliyoruz.

Şimdi ise basit düzeyde bir Publisher ve Consumer uygulamaları yapalım. Her ikisi içinde aşağıdaki gibi birer console uygulaması açalım.

Öncelikle projelerimizde RabbitMQ kullanabilmek için NuGet üzerinden veya terminal üzerinden RabbitMQ.Client kütüphanesini indirmemiz gerekiyor. Öncelikle publisher tarafını yazalım. Açıklamaları ile publisher tarafının kodunu paylaşıyorum.

class RabbitMQ
{
public static void Main()
{
SimpleLevelPublisher();
}
public static void SimpleLevelPublisher()
{
//öncelikle bir bağlantı oluşturuyoruz.

ConnectionFactory factory = new ConnectionFactory();
factory.Uri = new Uri("*****************");

//bağlantıyı aktifleştirme ve kanal açma

IConnection connection = factory.CreateConnection();

IModel channel = connection.CreateModel();

//Queue oluşturma(kuyruk oluşturma)

channel.QueueDeclare(queue: "example-queue", exclusive: false); //exclusive yapmamızın nedeni birden fazla bağlantıyla bu kuyrukta işlem yapıp yapamayacağımızı belirtmek için.

//Queue mesaj gönderme

//NOT! : Rabbitmq kuyruğa atacağı mesajları byte türünden kabul etmektedir. Haliyle mesajları byte'e dönüştürmemiz gerekiyor.

for (int i = 0; i < 100; i++)
{
Task.Delay(200); // bekletme

byte[] message = Encoding.UTF8.GetBytes("Merhaba "+i);

channel.BasicPublish(exchange: "", routingKey: "example-queue", body: message);
}

Console.Read();
}
}

Şimdi kuyruktaki mesajları tüketecek olan consumer tarafının kodlarını paylaşıyorum.

class RabbitMQ
{
public static void Main()
{
SimpleLevelConsumer();
}
public static void SimpleLevelConsumer()
{
//öncelikle bir bağlantı oluşturuyoruz.

ConnectionFactory factory = new ConnectionFactory();
factory.Uri = new Uri("***********");

//bağlantıyı aktifleştirme ve kanal açma

IConnection connection = factory.CreateConnection();

IModel channel = connection.CreateModel();

//Queue oluşturma(kuyruk oluşturma)

channel.QueueDeclare(queue: "example-queue", exclusive: false); //ÖNEMLİ ! Consumerdaki kuyruk pusblisherdaki gibi bire bir aynı tanımlanmalıdır.

//Queue(kuyruktan) mesaj okuma

EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

channel.BasicConsume(queue: "example-queue",false,consumer);
consumer.Received += (sender, e) =>
{
//kuyruğa gelen mesajın işlendiği yerdir.
//e.Body : bize kuyruktaki mesajın verisini bütünsel olarak getirecektir.
//e.Body.Span veya e.Body.ToArray() : kuyruktaki verinin byte verisini getirecektir.

byte[] bodyData = e.Body.ToArray();
Console.WriteLine(Encoding.UTF8.GetString(bodyData));
};

Console.Read();
}
}

İki taraftaki connection bağlantısını da localhost portumuz üzerinden veya CloudAMQP paneli üzerinden edinebiliriz.

2 projeyi de çalıştırdığımızda basit bir düzeyde kuyruğa 100 adet Merhaba mesajı gönderen publisher ve bu kuyruktaki mesajları tek tek tüketen ve ekrana basan consumer uygulamamızın başarıyla çalıştığını görebilirsiniz.

Şimdi ise senaryomuzu biraz daha geliştirelim ve RabbitMQ’de gelişmiş kuyruk yapılarına bir bakalım.

Round Robin Dispatching; Mesajları farklı kuyruklar arasında eşit bir şekilde dağıtarak işlemeyi ifade eder. Bu yöntem, mesajları tüketmek için birden fazla tüketici (consumer) tarafından kullanıldığında kullanılır.

Varsayılan olarak, RabbitMQ’da bir kuyruk, mesajları eşit bir şekilde tüm bağlı tüketiciye göndermek için “round-robin dispatching” algoritmasını kullanır. Yani, eğer bir kuyruğa birden fazla tüketici bağlıysa, yeni bir mesaj kuyruğa geldiğinde, RabbitMQ sırasıyla her bir tüketiciye bu mesajı gönderir.

Message Ancknowledgement; Bu, gönderenin mesajın ulaştığından emin olmasını ve alıcının da mesajı aldığını ve okuduğunu belirtmesini sağlar.

RabbitMQ tüketiciye(consumer) gönderdiği mesajı başarılı bir şekilde işlensin ve işlenmesin hemen kuyruktan silinmesi üzerine işaretler.Yani tüketicilerin kuyruktan aldıkları mesajları işlemeleri sürecinde herhangi bir kesinti veya problem durumu meydana gelirse ilgili mesaj tam olarak işlenemeceği için esasında görev tamamlanmamış olacaktır.Bu tarz durumlara istinaden mesaj başarıyla işlendiyse eğer kuyruktan silinmesi için tüketiciden RabbitMQ’nun uyarılması gerekmektedir.

Eğer ki message ancknowledgement özelliğini kullanıyorsanız kesinlikle mesaj işleme başarıyla sonlandığı taktirde RabbitMQ’ya mesajın silinmesi için haber göndermeyi unutmamak gerekiyor.Aksi taktirde mesaj tekrar yayınlanacaktır ve başka bir tüketici tarafından tekrar işlenecektir.Ayrıca mesajlar onaylanarak silinmediği taktirde kuyrukta kalınmasına neden olacak ve bu durum kuyrukların büyümesi ve yavaşlamasına neden olup performans düşüklüğüne neden olabilir.

Anlayacağınız üzere bu özellik sayesinde bir mesajın kaybolmadığından emin olabilmekteyiz.Yani tüketici açısından bir mesajın alındığını, işlendiğini ve artık kuyruktan silinebilir olduğunu ifade ederek süreci daha güvenilir hale getirmekteyiz.Tabi ki her tüketicinin işlem yoğunluğuna göre RabbitMQ’ya karşılık onay bildirim süresi değişkenlik gösterecektir.RabbitMQ açısından tüketiciden gelecek olan onay bildirimi için bir zaman aşım süresi söz konusudur.Bu varsayılan olarak 30 dakikadır.

RabbitMQ’de mesaj onaylama sürecini aktifleştirebilmek için consumer uygulamasında “BasicConsume” methodundaki ‘autoAck’ parametresini false değerine getirebilirsiniz.Böylece varsayılan olarak mesajların kuyruktan silinme davranışı değiştirilecek ve consumerdan onay beklenecektir.

  channel.BasicConsume(queue: "example-queue",autoAck:false,consumer:consumer);

Consumer mesajı başarıyla işlediğine dair uyarıyı “channel.BasicAck” methodu ile gerçekleşir.

  channel.BasicAck(e.DeliveryTag,multiple:false)

Multiple parametresi birden fazla mesaja dair onay bilgisi gönderir.Eğer “true” değeri verilirse DeliverTag değerine sahip olan bu mesajla birlikte bundan önceki mesajlarında işlendiğini onaylar.Eğer “false” verilirse sadece bu mesaj için onay bildirisinde bulunacaktır.

Bazen consumerlar istemsiz durumların dışında kendi kontrollerimiz neticesinde mesajları işlememek isteyebilir veya mesajların başarıyla sonuçlanmayacağını anlayabiliriz.Böyle durumlarda “channel.BasicNack” methodunu kullanarak RabbitMQ’ya bilgi verebilir ve mesajı tekrardan işletebiliriz.

channel.BasicNack(deliveryTag: e.DeliveryTag, multiple: false,requeue:true);

Tabi burada requeue parametresi oldukça önem arz etmektedir.Bu parametre consumer tarafından işlenemeyeceği ifade edilen bu mesajın tekrar kuyruğa eklenip eklenmemesinin kararını vermektedir.

“True” değeri verildiği taktirde mesaj kuyruğa tekrardan işlenmek üzere eklenecek, “false” değerinde ise kuyruğa eklenmeyerek silinecektir.Sadece RabbitMQ’ya bu mesajın işlenemeyeceğine dair bilgi verilmiş olunacaktır.

  channel.BasicCancel(consumerTag: consumerTag);

BasicCancel methodu ile de verilen consumerTag değerine karşılık gelen queue’daki tüm mesajlar reddedilerek, işlenmez.

   channel.BasicReject(deliveryTag:3,requeue:true);

Son olarak kuyrukta bulunan mesajlardan belirli olanların consumer tarafından işlenmesini istemediğimiz durumlarda BasicReject methodunu kullanabiliriz.

Message Durability ; Consumerlar’ın sıkıntı yaşaması durumunda mesajların kaybolmayacağının garantisinin nasıl sağlanacağını açıklamış olduk.Ancak RabbitMQ sunucusunun kapanması/hataya düşmesi gibi durumlarda ne olacağını açıklamadık.Normal şartlarda kapanma durumu söz konusu olursa tüm kuyruklar ve mesajlar silinecektir.

Böyle bir durumda mesajların kaybolmaması, yani kalıcı olabilmesi için kuyruk ve mesaj açısından kalıcı olarak işaretleme yapmamız gerekmektedir.

  channel.QueueDeclare(queue: "example-queue", exclusive: false,durable:true);

IBasicProperties properties = channel.CreateBasicProperties();
properties.Persistent = true;

channel.BasicPublish(exchange: "", routingKey: "example-queue", body: message,basicProperties:properties);

Burada mesaj konfigürasyonu için properties.Persistent = true; ayarını kuyruk konfigürasyonu için de durable:true parametresini vermemiz yeterli olacaktır.

Ancak unutmamak gerekir ki bu şekilde kalıcı olarak işaretlemek iletinin kaybolmayacağını tam olarak garanti etmez.

Fair Dispatch ; RabbitMQ’da tüm consumerlara eşit bir şekilde mesajları iletebilirsiniz. Bu da kuyrukta bulunan mesajların mümkün olan en adil şekilde dağıtımını sağlamak için kullanılan bir özelliktir. Consumer’lara eşit bir şekilde mesajların iletilmesi sistemdeki performansı düzenli bir hale getirecektir.Böyle bir consumer’ın diğer consumerlardan daha fazla yük alması ve sistemdeki diğer consumerların kısmı aç kalması engellenmiş olur.

Bu durumlarda BasicQos methodu ile mesajların işleme hızını ve teslimat sırasını belirleyebiliriz. Böyle Fair Dispatch konfigüre edilebilmektedir.

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

prefetchSize bir consumer tarafından alınabilecek en büyük mesaj boyutunu byte cinsinden belirler.0 ise sınırsız demektir. prefetchCount ise bir consumer tarafından aynı anda işleme alınabilecek mesaj sayısını belirler.Son olarak global ise bu konfigürasyonun tüm consumer’lar için mi yoksa sadece çağrı yapılan consumer için mi geçerli olacağını belirler.

Şimdi ise RabbitMQ’daki mesaj tasarımlarına bir bakalım.

RabbitMQ’da mesaj tasarımlarından kastedilen tıpkı design pattern’lerde olduğu gibi belli başlı senaryolara karşı gösterilebilecek önceden tanımlanmış,tarif edilmiş ve pratiksel olarak adımları saptanmış davranışlardır.

Belirli bir problemi çözmek için kullanılan bu tasarımlar, genel anlamda yapısal davranışı ve iletişim modelini ifade etmektedir.

Yani iki servis arasında message broker ile yapılacak haberleşme sürecinde iletilecek mesajların nasıl iletileceği,nasıl işleneceği, ne şekilde yapılandırılacağını ve ne tür bilgiler taşıyacağını belirler.

Her tasarım farklı bir uygulama senaryosuna ve gereksimine göre şekillenmekte ve en iyi sonuçlar alınabilecek şekilde yapılandırılmaktadır.

P2P(Point-to-Point) Tasarımı ; Bu tasarımda bir publisher ilgili mesajı direkt bir kuyruğa gönderir ve bu mesaj kuyruğu işleyen bir consumer tarafından tüketilir. Eğer ki senaryo gereği bir mesajın bir tüketici tarafından işlenmesi gerekiyorsa bu yaklaşım kullanılır.

P2P tasarımı gerektiren senaryolarda genellikle direct exchange kullanılmaktadır diyebiliriz.

P2P tasarım örneği

Publish/Subscribe(Pub/Sub) Tasarımı ; Bu tasarımda publisher mesajı bir exchange’e gönderir ve böylece bu mesaj bu exchange’e bind edilmiş olan tüm kuyruklara yönlendirilir. Bu tasarım, bir mesajın birçok tüketici tarafından işlenmesi gerektiği durumlarda kullanışlıdır.

Publish/Subscribe tasarımı gerektiren senaryolarda genellikle fanout exchange kullanılmaktadır diyebiliriz.

Publish/Subscribe tasarım örneği

Work Queue Tasarımı ; Bu tasarımda publisher mesajın birden fazla consumer arasından yalnızca biri tarafından tüketilmesi amaçlanmaktadır.Böylece mesajların işlenmesi sürecinde tüm consumer’lar aynı iş yüküne ve eşit görev dağılıma sahip olacaklardır.

Work Queue tasarımı gerektiren senaryolarda genellikle direct exchange kullanılmaktadır diyebiliriz.

Work Queue tasarım örneği

Request/Response Tasarımı ; Bu tasarımda publisher bir request yapar gibi kuyruğa mesaj gönderir ve bu mesajı tüketen consumer’dan sonuca dair başka kuyruktan bir yanıt/response beklenir. Bu tarz senaryolar için oldukça uygun bir tasarımdır.

Enterprise Service Bus Nedir ?

Enterprise Service Bus servisler arası entegrasyonu sağlayan komponentlerin bütünüdür diyebiliriz. Yani farklı yazılım sistemlerinin birbiriyle iletişim kurmasını sağlamak için kullanılan bir yazılım mimarisi ve araç setidir.

Burada şöyle bir örnek üzerinden anlatıma devam edebiliriz. RabbitMQ farklı sistemler arasında bir iletişim modeli ortaya koymamızı sağlayan bir teknolojidir. Enterprise Service Bus’da RabbitMQ gibi farklı sistemlerin birbiriyle etkileşime girmesini sağlayan teknolojilerin kullanımı ve yönetilebilirliğini kolaylaştırmakta ve buna bir ortam sağlamaktadır.

Yani Enterprise Service Bus’un temel amacı farklı yazılım uygulamalarının/servislerinin/sistemlerinin birbiriyle kolayca iletişim kurabilmesini sağlamaktır.

Bu mimariyi sunan bir çok kütüphane mevcuttur.Genelde en çok kullanılan kütüphane MassTransit kütüphanesidir.

MassTransit kütüphanesi .NET için geliştirilmiş olan, distributed uygulamaları rahatlıkla yönetmeyi ve çalıştırmayı hedefleyen ücretsiz open-source bir Enterprise Service Bus framework’dur.

Messaging tabanlı, gevşek bağlı(loosely coupled) ve asenkron olarak tasarlanmış dağınık sistemlerde yüksek dereceli kullanılabilirlik, güvenirlilik ve ölçeklenebilirlik sağlayabilmek için servisler oluşturmayı oldukça kolaylaştırmaktadır. Özellikleri ise ;

-Open source ve ücretsizdir.

-Kullanımı kolaydır.

-Güçlü mesaj desenleri(message pattern)destekler.

-Distributed transaction sağlar.

-Test edilebilirdir.

-Monitoring özelliği mevcuttur.

-Transport işlemlerinin kopleksliğini düşürür.

-Multiple transport desteği sağlar.

-Scheduling mevcuttur.

-Request/Response parttern’larını destekler.

-Message broker exchange’lerini yönetebilir.

Genel bir özet yapmak gerekirse tamamen farklı uygulamalar arasında message-based communication yapabilmemizi sağlayan bir transport gateway’dir.Peki transport gateway nedir ? Bu soruya da cevap vermiş olalım.

Transport Gateway farklı sistemler arasında farklı iletişim protokollerini kullanarak sistemler arasında iletişim kurmayı sağlayan bir araçtır.Bu araç; iletişim protokollerindeki farklılıkların gizleyerek sistemlerin birbiriyle sorunsuz bir şekilde çalışabilmesini sağlamaktadır.

Misal olarak; Windows işletim sistemi ile Linux arasında bir iletişim süreci söz konusuysa, farklı protokollere dayalı olan bu sistemler arasında ister istemez uyumsuzluk söz konusu olabilir ve bundan dolayı iletişimde sorunlar meydana gelebilir.İşte böyle durumlarda Transport Gateway kullanılarak süreç en ideal hale getirilebilir.

MassTransit uygulamanın message broker bağımlılığını ortadan kaldırmak için tercih edilebilir.

MassTransit özellikle microservis mimarisi gibi distributed sistemlerin oluşturulması ve bu sistemlerin kendi aralarında haberleşme sürecinde herhangi bir teknolojiye dair olabilecek bağımlığı soyutlamak için kullanılan bir kütüphanedir.

MassTransit’in kullanılabilmesi için öncelikle iki farklı uygulamaya/servise ihtiyaç vardır. Bu servisler .Net Core,Worker Service vs. olabileceği gibi tipik console uygulamaları da olabilirler. Biz ilk olarak console uygulamaları üzerinden bir örneklendirme de bulunalım.Daha sonra Worker Service üzerinden de bir örnek vereceğim.

İlk olarak aşağıdaki görseldeki gibi Consumer ve Producer olamak üzere iki uygulama oluşturalım.

Bu uygulamanın yanında bir de Shared isimli bir class library oluşturalım. Bu class library iki servis arasında gerçekleştirilecek iletişimin mesaj formatını belirliyor olacak.

Aşağıdaki görselde görüldüğü üzere her iki servis arasında içerisinde Text değeri olan ExampleMessage türünden bir mesaj iletimi gerçekleştireceğini ayarlamış oluyoruz.

Şimdi ise Consumer ve Producer uygulamaları arasında MassTransit ile iletişimi sağlayabilmek için her iki uygulamaya da MassTransit.RabbitMQ kütüphanesini yükleyelim.

Consumer ve Producer taraflarının kodlarını aşağıda açıklama satırlarıyla paylaşıyorum.

RabbitMQ.MassTransit.Publisher

using MassTransit;
using RabbitMQ.MassTransit.Shared.Messages;

// RabbitMQ'ya bağlantı oluştur

string rabbitMQUri = "*****";

string queueName = "example-queue";

IBusControl bus = Bus.Factory.CreateUsingRabbitMq(factory =>
{
factory.Host(rabbitMQUri);
});


// Gönderilecek mesajın tipini ve içeriğini belirle
ISendEndpoint sendEndpoint = await bus.GetSendEndpoint(new($"{rabbitMQUri}/{queueName}"));

// Kullanıcıdan mesaj al ve mesajı belirlenen kuyruğa gönder
Console.Write("Gönderilecek mesaj:");
string message = Console.ReadLine();
sendEndpoint.Send<IMessage>(new ExampleMessage() // IMassTransitMessage arayüzünü uygulayan ExampleMessage tipinde bir nesne oluşturup gönderilecek mesajı içerisine yerleştirir.
{
Text = message
});
Console.Read();

RabbitMQ.MassTransit.Consumer

using MassTransit;
using RabbitMQ.MassTransit.Consumer.Consumers;
using RabbitMQ.MassTransit.Shared.Messages;

string rabbitMQUri = "*****";

string queueName = "example-queue"; // kuyruk adı

// RabbitMQ'ya bağlantı oluştur ve mesajları alacak bir endpoint oluştur
IBusControl bus = Bus.Factory.CreateUsingRabbitMq(factory =>
{
factory.Host(rabbitMQUri); // Belirtilen RabbitMQ adresine bağlanmak için Host metodu kullanılır

// Belirtilen kuyruk adını dinlemek için bir ReceiveEndpoint oluştur
factory.ReceiveEndpoint(queueName: queueName, enpoint =>
{
enpoint.Consumer<ExampleMessageConsumer>(); // Alınan mesajları işleyecek olan ExampleMessageConsumer tüketici sınıfını endpoint'e ekle
});
});

await bus.StartAsync();

Console.Read();

RabbitMQ.MassTransit.Consumer tarafında Consumers diye bir klasörde açtım ayrıca aşağıdaki gibi ExampleMessageConsumer isimli bir class ekledim.

  public class ExampleMessageConsumer : IConsumer<IMessage>
{
public Task Consume(ConsumeContext<IMessage> context)
{
Console.WriteLine($"Gelen mesaj : {context.Message.Text}");
return Task.CompletedTask;
}
}

Console tarafını açıkladığımıza göre Worker Service tarafında da MassTransit’in kullanımına bir bakalım.

İlk olarak RabbitMQ.WorkerService.MassTransit.Publisher ve RabbitMQ.WorkerService.MassTransit.Consumer isimli iki WorkerService uygulaması açıyoruz.

Bu 2 projeye MassTransit.RabbitMQ bu kütüphaneyi kuruyoruz. Bu iki projemizde yine RabbitMQ.MassTransit.Shared isimli ortak class library’mizi referans edecek.

İlk olarak Publisher isimli worker service projemizden başlayalım.

Program.cs içerisinde worker service’mizin temel yapılandırmalar, konfigürasyon gibi işlemleri yer alır ve bu ilemler ConfigurationServices fonksiyonu üzerinden gerçekleştirilir.

Publisher’da ilk olarak MassTransit ile ilgili temel yapılandırmayı AddMassTransit fonksiyonuyla görseldeki gibi gerçekleştiriyoruz.

Bu işlemden sonra temel mesajı gönderecek bir servis oluşturmamız gerekiyor. Publisher projesi içerisinde Services isimli bir klasör ve içine PublishMessageService bir sınıf oluşturuyoruz. Bu sınıfı BackgroundService’den miras alarak yaratıyoruz.

Daha sonra ilgili methodları implemente ediyoruz. Yukarıda IPublishEndpoint’dan bir nesne yaratıyoruz ve bu nesneye constructor’da inject işlemi uyguyoruz.Basit bir şekilde 0'dan artacak şekilde mesaj gönderecek yapıyı aşağıdaki gibi kuruyoruz. _publishEndpoint.Publish(message) fonksiyonu ile de mesajımızı yolluyoruz.

Burada Publish fonksiyonunun genel davranışı bu exchange’lere bağlı olan bütün kuyruklara mesaj gönderme işlemidir.

Artık ilgili servisi Worker Service’e aşağıdaki gibi tanımlamak geriye kalıyor.

using MassTransit;
using MassTransit.Testing;
using RabbitMQ.WorkerService.MassTransit.Publisher;
using RabbitMQ.WorkerService.MassTransit.Publisher.Services;

IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
//temel yapılandırma
services.AddMassTransit(configurator =>
{
configurator.UsingRabbitMq((context, _configurator) =>
{
_configurator.Host("*******");
});
});

services.AddHostedService<PublishMessageService>();

})
.Build();

host.Run();

Consomer tarafında da AddMassTransit üzerinden aşağıdaki gibi bir konfigürasyon yapmamız gerekiyor.

using MassTransit;
using RabbitMQ.WorkerService.MassTransit.Consumer;

IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
//temel yapılandırma
services.AddMassTransit(configurator =>
{
configurator.UsingRabbitMq((context, _configurator) =>
{
_configurator.Host("*******");
});
});


})
.Build();

host.Run();

Yapılandırma kısmı aslında Publisher tarafı ile aynı. Ekstra olarak bu yapılandırmalara consumer’ları da tanımlamamız gerekiyor. Bunun için de proje içerisinde ilk olarak Consumer isimli bir klasör oluşturuyoruz ve içine türü IConsumer olan ExampleMessageConsumer isimli bir class oluşturuyoruz.

using MassTransit;
using RabbitMQ.MassTransit.Shared.Messages;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace RabbitMQ.WorkerService.MassTransit.Consumer.Consomer
{
public class ExampleMessageConsomer : IConsumer<IMessage>
{
public Task Consume(ConsumeContext<IMessage> context)
{
Console.WriteLine($"Gelen Mesaj : {context.Message.Text}");

return Task.CompletedTask;
}
}
}

Son olarak RabbitMQ.WorkerService.MassTransit.Consumer içerisinde ExampleMessageConsumer türünde bir tane consumer’imiz olacağını belirteceğiz .Configurator üzerinden bir consumer ekleyeceğiz ve bu consomer’in hangi kuyruğu tüketeceğini ReceiveEndPoint üzerinden yapılandıracağız.

using MassTransit;
using RabbitMQ.WorkerService.MassTransit.Consumer;
using RabbitMQ.WorkerService.MassTransit.Consumer.Consomer;

IHost host = Host.CreateDefaultBuilder(args)
.ConfigureServices(services =>
{
//temel yapılandırma
services.AddMassTransit(configurator =>
{
configurator.AddConsumer<ExampleMessageConsomer>();

configurator.UsingRabbitMq((context, _configurator) =>
{
_configurator.Host("*******");

_configurator.ReceiveEndpoint("example-message-queue", e=> e.ConfigureConsumer<ExampleMessageConsomer>(context));
});
});


})
.Build();

await host.RunAsync();

Dikkatiniz çektiyse MassTransit’i kullanırken servisler arasında mesaj iletimini Publish ve Send olmak üzere iki farklı yolla gerçekleştirebiliyoruz.

Publish ; Event tabanlı mesaj iletim yöntemini ifade eder. Özünde publish/subsribe pattern’ini uygulamaktadır. Event publish edildiğinde, o event subscribe olan tüm queue’lara mesaj iletecektir.

MassTransit’te IPublishEndpoint referansıyla kullanılabilmektedir.

Send; Command tabanlı mesaj iletim yöntemini ifade eder. Hangi kuyruğa mesaj iletişimi gerçekleştirilecekse endpoint olarak bildirilmesi gerekmektedir.

MassTransit’te ISendEndpointProvide referansı üzerinden ‘GetSendEndpoint’ metodu ile mesajın gönderileceği kuyruk ifade edilebilir.

Genel kavramları ve örnekleriyle beraber elimden geldiğinde RabbitMQ’u anlatmaya gayret gösterdim. Son olarak RabbitMQ teknolojisini kullanan günümüz projelerine daha yakın olacağını düşüneceğim için bir örnek projede reponun içine bırakacağım.

Biraz uzun bir yazı oldu farkındayım, ancak RabbitMQ ile alakalı tüm kavramlara elimizden geldiğince değindiğimizi düşünüyorum.Projeyle alakalı kodların olduğu github repomu aşağıya bırakıyorum, takıldığınız noktalarda bana iletişim adreslerimden ulaşabilirsiniz.

Kaynaklar;

--

--