RabbitMQ Nedir
Selamlar,
Bugün RabbitMQ hakkında, hem Windows10 üzerine olan kurulumunu hem de .Net ortamında nasıl çalıştığı konusunu, örnekler ile derinlemesine inceleyeceğiz.
RabbitMQ bir mesaj kuyruğu sistemidir. Benzerleri Apache Kafka, Msmq, Microsoft Azure Service Bus, Kestrel, ActiveMQ olarak sıralanabilir. Amacı herhangi bir kaynaktan alınan bir mesajın, bir başka kaynağa sırası geldiği anda iletilmesidir. Mantık olarak Redis Pub/Sub’a benzemektedir. Ama burada yapılacak işler bir sıraya alınmaktadır. Yani iletimin yapılacağı kaynak ayağa kalkana kadar, tüm işlemler bir quee’de sıralanabilir. Fakat aynı durum Redis Pub’Sub için geçerli değildir. RabbitMQ çoklu işletim sistemine destek vermesi ve açık kaynak kodlu olması da en büyük tercih sebeplerinden birisidir.
Peki neden kullanılmalıdır: Bazı işlemlerin anlık yapılmasına ihtiyaç yoktur. Örnek vermek istenir ise sisteme yeni bir haber girildiğinde, ya da var olan bir haberin güncellenmesi anında cache’in düşürülmesi, bir başka örnek de upload edilen”Gif” dosyalarının scale işleminin yapılması gibi düşünülebilir. Hatta zaman ayarlı message ve otomatik mailler de yine RabbitMq’ya güzel bir örnek olabilir. Sıraya alınan bu işlemlerin asenkron bir şekilde yapılması, hem çalışan uygulamanın boş yere bekletilmemesinden hem de sunucu üzerindeki işlem maliyetinin minimuma indirilmesinden dolayı RabbitMQ iyi bir tercih sebebi olabilir. Ayrıca scalable olmasından dolayı da değişen trafikli yapılarda ayrıca tercih edilebilir.
Öncelikle Windows 10 İşletim Sistemine Kurulum :
- RabbitMQ Erlang virtual runtime’da çalışır. Bunun için öncelikle Erlang‘in ilgili link’den yüklenmesi gerekmektedir. Peki Erlang nedir : Ericsson’da çalışan Joe Armstrong, Mike Williams ve Robert Virding in tarafından geliştirilen aynı zamanda message broker olan RabbitMQ için muhteşem bir platform olan bir programlama dilidir.
- RabbitMQ ilgili adresden indirlir.
-
1"C:\Program Files\RabbitMQ Server\rabbitmq_server-3.6.6\sbin\rabbitmq-plugins.bat enable rabbitmq_management
Yukarıdaki kod çalıştırılarak RabbitMQ için gerekli olan pluginlere izin verilir.
4. Son olarak oluşturulan RabbitMQ servisi aşağıda görüldüğü gibi tekrardan başlatılır.
Internet browser’a gelinip http://localhost:15672 yazılır ise aşağıdaki gibi bir monitor ekranı ile karşılaşılır.
Böylece gerekli laboratuvar ortamı hazırlanmış olunur.
Şimdi sıra geldi RabbitMQ’nun çalışma mantığına ve bilinmesi gereken bazı terimlerine:
- Producer: Mesajı atan kaynak yani uygulamadır. Redis’deki Pub/Sub düşünüldüğünde Publisher tarafıdır.
- Queue : Gönderilen mesajlar alıcaya ulaştırılmadan önce bir sıraya konur. Gelen yoğunluğa göre veya alıcıya erişilemediği durumlarda, gelen tüm mesajlar Queue’de yani memory’de saklanır. Eğer bu süreç uzun sürer ise memory şişebilir. Ayrıca server’ın restart edilmesi durumunda ilgili mesajlar kaybolabilir.
- Consumer: Gönderilen mesajı karşılayan sunucudur. Yani Redis Pub/Sub’daki Subscribe’dır. Kısaca ilgili kuyruğu(Queue)’yu dinleyen taraftır.
- Fifo: RabbitMQ’da giden mesajların işlem sırası first in first out yani ilk giren ilk çıkar şeklindedir.
Producer olarak ilk paketin gönderilmesi [POST]:
Bu örnekde Visual Studio 2015 kullanılarak bir console application yazılmıştır. Öncelikle indirilecek paketler, aşağıdaki gibidir. RabbitMQ.Client ve Newtonsoft.Json Nuget package manager kullanılarak indirilebilir.
Bu örnekde Person tipinde bir sınıf doldurularak “Borsoft” adında bir queue’ye gönderilecektir.
- ConnectionFactory :RabbitMQ hostuna bağlanmak için kullanılır. Bulunulan sunucudaki host name (localhost),virtual host ve credentials (password) girilir.
- CreateModel() methodu ile RabbitMQ üzerinde yeni bir channel yaratılır. İşte bu açılan channel yani session ile yeni bir queue oluşturulup istenen mesaj bu channel üzerinden gönderilmektedir.
- QueueDeclare() methodu ile oluşturulacak olan queue‘nin ismi tanımlanır. “durable” ile in-memory mi yoksa fiziksel olarak mı saklanacağı belirlenir. Genel de RabbitMQ’da hız amcı ile ilgili queuelerin memory’de saklanması tercih edilse de sunucunun restart olması durumunda ilgili mesajların kaybolmasından dolayı da, hızdan ödün verilerek fiziksel olarak bir hard diskte saklanması tercih edilebilir. “exclusive” parametresi ile diğer connectionlar ile kullanılması izni belirlenir. Eğer queue deleted olarak işaretlenmiş ise ve tüm consumerlar bunu kullanmayı bitirmiş ise ya da son consumer iptal edilmiş veya channel kapanmış ise silinmez. İşte bu gibi durumlarda “autoDelete” ile delete method’u normal olarak çalıştırılır. Ve ilgili queueler silinir. “arguments” Belirlenen exchanges ile alakalı parametrelerdir. Exchangeler birazdan inceleyeceğiz.
- İlgili doldurulan “Person” sınıfı JsonConvert ile Serialize edilir ve byte[] dizisine çevrilip “body“‘e atanır.
- BasicPublish() methodu “exchange” aslında mesajın alınıp bir veya daha fazla queue’ya konmasını sağlar. Bu route algoritması exchange tipine ve bindinglere göre farklılık gösterir. “Direct, Fanout ,Topic ve Headers” tiplerinde exchangeler mevcuttur.
Direct exchange: Yapılacak işlere göre bir routing key belirlenir ve buna göre ilgili direct exchange ile amaca en uygun queue gidilir.
Fanout exchange: Burada routing key’in bir önemi yoktur. Daha çok broadcast yayınlar için uygundur. Özellikle (MMO) oyunlarda top10 güncellemeleri ve global duyurular için kullanılır. Yine real-time spor haberleri gibi yayınlarda fanout exchange kullanılır.
Topic Exchange: Bir route mesajın bir veya daha çok queue’ye gitmesi amacı ile kullanılır. Publish/Subscribe pattern’in bir varyasyonudur. Eğer ilgili sorun birkaç consumer’i alakadar ediyor ise, hangi çeşit mesajı almak istediklerini belirlemek için Topic Exchange kullanılmalıdır.
Örnek stok fiyatlarının değişmesi veya arkada çalışan birkaç farklı task process’in farklı workerlar ile tamamlanması ve hangi taskin hangi worker tarafından ele alınması gibi durumlarda kullanılır.
Headers Exchange: Yine bu exchange de routing key’i kullanmaz ve message headers’daki birkaç özellik ve tanımlama ile doğru queue’ye iletim yapar. Header üzerindeki attributeler ile queue üzerindeki attributelerin, tamamının değerlerinin birbirini tutması gerekmektedir. Bir tanımlamada Header Exchange’in Direct Exchange’in Steroidli hali dendiğini gördüm :)
“BasicPublish() methoddaki routingKey” : Girilen key’e göre ilgili queue’ye gidilmesi sağlanır. “body:” Queue’ye gönderilecek mesaj byte[] tipinde gönderilir. Mesaj denince aklınıza sadece Text gelmesin. Her türlü object’i gönderebiliriz. Örneğin bu uygulamada “Person” class’ı gönderilecektir.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
using Newtonsoft.Json; using RabbitMQ.Client; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace RabbitMq_Sender_Example { class Program { static void Main(string[] args) { Person person = new Person() { Name = "Bora", SurName = "Kasmer", ID = 1, BirthDate = new DateTime(1978, 6, 3),Message="İlgili aday yakınımdır :)" }; var factory = new ConnectionFactory() { HostName = "localhost" }; using (IConnection connection = factory.CreateConnection()) using (IModel channel = connection.CreateModel()) { channel.QueueDeclare(queue: "Borsoft", durable: false, exclusive: false, autoDelete: false, arguments: null); string message = JsonConvert.SerializeObject(person); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "Borsoft", basicProperties: null, body: body); Console.WriteLine($"Gönderilen kişi: {person.Name}-{person.SurName}"); } Console.WriteLine(" İlgili kişi gönderildi..."); Console.ReadLine(); } } public class Person { public int ID { get; set; } public string Name { get; set; } public string SurName { get; set; } public DateTime BirthDate { get; set; } public string Message { get; set; } } } |
Gönderme işleminden sonraki Monitor ekranınındaki Queue’lerin görüntüsü aşağıdaki gibidir: Görüldüğü gibi “Borsoft” isimli queue’de gönderilmeyi bekleyen 1 paket bulunmaktadır. Çünkü henüz consumer uygulamasını çalıştırmadık. Ve hiç bir paket henüz iletilmedi.
Consumer olarak Queue’den ilgili paketin alınması [RECEIVE] :
Receive işleminde bağlanılacak host(“localhost”) ve Queue (“Borsoft”) belirlendikten sonra “consumer.Received” event sayesinde ilgili queue sürekli bir dinleme modunda olacaktır. İlgili mesaj geldiğinde öncelikle “byte[]” dizisi olarak alınan mesaj string bir veriye çevrilecek daha sonra “deserialize” işlemi ile beklenen “Person” sınıfına dönüştürülecektir.
channel.BasicConsume(): Methodu ile ilgili Queue’den mesajları çekme işlemine başlanır. Burada “noAck” parametresi true olarak atanır ise, ilgili mesaj alındıktan sonra Queue’den otomatik olarak silinir.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Events; using Newtonsoft.Json; namespace RabbitMQ_Receive { class Program { static void Main(string[] args) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (IConnection connection = factory.CreateConnection()) using (IModel channel = connection.CreateModel()) { channel.QueueDeclare(queue: "Borsoft", durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Person person = JsonConvert.DeserializeObject<Person>(message); Console.WriteLine($" Adı: {person.Name} Soyadı:{person.SurName} [{person.Message}]"); }; channel.BasicConsume(queue: "Borsoft", noAck: true, consumer: consumer); Console.WriteLine(" İşe Alındınız. Teşekkürler :)"); Console.ReadLine(); } } } public class Person { public int ID { get; set; } public string Name { get; set; } public string SurName { get; set; } public DateTime BirthDate { get; set; } public string Message { get; set; } } } |
İlgili code çalıştırıldığında aşağıdaki gibi bir sonuç ekranı alınır:
Ayrıca monitor ekranına bakılır ise ilgili Queue’nin ilgili consumer’a iletiminden sonra silindiği görülür:
RabbitMQ günümüzün en populer mesaj kuyruğu sistemlerinden biri demek hiç de yanlış olmaz. Apache Kafka kadar hızlı olmasa da Erlang ile yazılmış olması, inanılmaz esnek bir yapısı olması, tamamı ile transactional olması, istenirse queuelerin bir storageda da saklanabilmesi ve muhteşem pluginleri ile benim favorim diyebilirim.
Böylece geldik bir makalenin daha sonuna. Yeni bir makalede görüşmek üzere hoşçakalın.
Source:
Hocam merhaba,sizin bir projeniz vardı.İşe alınma projesi.Merkez bankasından veri çekiyordunuz,
O konunun linkini bulamadım.Link verebilirmisiniz?
Teşekkür Ederim
Selam Yunus,
Buyurun : http://www.borakasmer.com/bir-is-gorusmesinde-detayli-istenen-proje-bolum1/
Merhaba hocam,
Bu işlerde henüz yeniyim ve aklıma takılan sorular var. Mesela ben kendi bilgisayarımda VMware ile yeni bir debian yada ubuntu oluştursam.Rabbitmq buraya kursam. Publish ve Consume işleri için de kendi windows um üzerinde C# ile yazıp konsol uygulamasında nasıl kullanabilirim. Yukarıdaki örnekte localhost kısmına ne yazmam gerekir. İyi çalışmalar
Selamlar Huseyin,
VMware ile hiç denemedim. Ama Vm makinanın IP’si ile ilgili RabbitMQ channel’a ulaşman yüksek ihtimal. Yani kısaca değişen sadece erişim adresi olucak.
İyi çalışmalar.
Başlanğıç için çok faydalı oldu.
Teşekkürler..
Merhaba Hocam.
RabbitMQ Error kuyruğunda olan işlemler için nasıl bir yol izlememizi tavsiye dersiniz? Error kuyruğundaki işlemleri de consumer etmem gerekiyor. Verilermi kaybolmadan nasıl yaparım?
İyi çalışmalar.
Selamlar,
Başka bir Windows Service ya da Cloudeda, mesela Azure’da WebJobs ile Error Queeleri tek tek işleyebilirsin.
Merhaba,
channel.BasicConsume(queue: “Borsoft”, noAck: true, consumer: consumer); satırında BasicConsume method’unun noAck diye bir parametresi bulunmuyor. Library değişmiş olmalı.
İyi çalışmalar
Merhaba, RabbitMQ.Client Versiyon 5.1.0 için autoAck: true seçeneğini kullanabilirsin.
üstadım c# mvc de yogun bir sayfadan sql den gelen istekleri yada başka kaynakdan gelen istekleri RabbitMQ ile kuyruga alabilirmiyiz ? alamaz isek örerin nedir?
Selamlar,
Sql’den derken sanırım Sql’e yapılan istekleri bir Queue’ye almayı kastediyorsunuz. Bunu Sql’e gelmeden önce istekleri bir microservis ile karşılıyarak, yükü dağıtabilirsiniz. Ya da araya bir cache koyaabilirsiniz. Ama Sql’e gelen yükü bir Queue ile sanırım dağıtamassınız..
İyi çalışmalar.
Merhabalar hocam. Size bir kaç sorum olacak. Öncelikle yazılarınız için teşekkür ediyorum çok yardımcı oluyor. Sorumun ilki şudur. Bu yapıyı yani kuyruk yapılarını servis (rest servis) yapılı bir mimaride nereye oturtmam lazım? Ben de her modül için bir servis var. O servis içine o modüle özgü talepleri alıyor. Yani Token yapısı gibi, talepte bulunan kullanıcı bilgisi gibi. Bu durumda MQ yapımı nereye kurmam lazım? Servisin önüne koyup talepleri servise iletmesi mi yoksa servisim talepleri alıp MQ yapısına mı iletmesi?
Buna bağlı olarak 2. sorum da şu olacaktır. Birden fazla server kullanıp yük dağılımı yapmak mı yoksa MQ yapısı kullanmak mı? Yoksa ikisi bir arada mı? Eğer ikisi bir arada olacaksa her server için ayrı bir MQ yapısı mı kurmam gerekli bunlar nasıl ortak çalışacak?
Cevap için şimdiden teşekkürler.
Selam Oğuz,
Öncelikle teşekkürler. Soruların çok güzel. Konuyu anladığın belli.
Bu sorularının cevabını ancak bir seminer ile verebilirim. Önümüzdeki günlerde yapacağım seminerlerimi takip etmeni tavsiye ederim.
İyi çalışmalar.
Hocam Selam
Yazı için çok teşekkürler. Makalenin sonlarında “Tamami ile transactional olması” gibi bir ifade kullanmışsınız. RabbitMQ transactional queue ve transactional işlemleri destekliyor mu? Örneğin para cekme aksiyonu yapıldı ve bir event fırlatıldı ve rabbitmq da ilgili kuyrugu yazıldı diyelim, bu event i işleyen consumer ilgili muhasebe işlemlerini yaparken exception alırsa ilgili event rollback edilir ve kuyruktan da silinir mi ?
Teşekkürler
Selam Mehmet,
Öncelikle güzel yorumlarınız için teşekkürler :) RabbitMQ, maalesef kendi başına transactinal değildir. Ama siz Hata zamanında Consumer – Publisher yolu ile bu işi transactinal hale getirebilirsiniz.
Şunu da şuraya bırakayım :) https://www.rabbitmq.com/confirms.html
iyi çalışmalar.
Hocam yazınız için çok teşekkür ederim .Sorum şu; biraz araştıma yaptığımda CallbackException karşıma çıkmakta .CallbackException görevi hakkında ufak bir açıklama rica etsem?
Hocam merhabalar,
Bir projemde queue üzerinde günlük 400 500k gelen kaydı tutuyorum ve bunları consumer da işliyorum. Burada aklıma takılan ve oturtamadığım konu ise queue da prefetch count değerim 16 benim. Bu sayısının ideal değerini nasıl hesaplayabilirim. Konuyla ilgili bir öngörünüz var mıdır?
Teşekkürler,
Ellerinize sağlık, Hocam
Tesekkurler Nurullah..