SignalR Chatleşme sırasında Yazışmaları RabbitMQ Kullanılarak Sql Server’a Kaydetme Bölüm2
Bölüm 2 Büyük Yükler Altında, Yapılması Gereken Asenkron İşleri, Bir Queue’ya Atıp Sıra ile Yapma:
Selamlar,
Bu makaleye geçmeden önce İlk makaleyi okumanızı tavsiye ederim. Bu makalede 1. makalede kalınan yerden devam edilecektir.
Artık ortak yazışılan bir chat uygulamamız var. Şimdi diyelim ki 10 bin kişi aynı anda yazışsın. İşte tam bu durumda her yazılan mesajın logunun, senkron olarak bir DB’de saklanması aynı sunucu üzerinden denenir ise, ilgili DB makinasının rem ve cpu’yusunun tavan yapması ve bir süre sonra cevap veremeyecek hale gelmesi yüksek bir ihtimaldir. Bu durumda hali hazırda yazışmaya çalışan 10bin kişi, bekletilecek ve yazılan mesajlar anlık olarak ilgili kişilere iletilemiyecektir. Ayrıca çıkabilecek hata loglarının da ayrı bir sistemde tutulması gibi bir durum söz konusu ise, bu sefer işler daha da kötüye gidecek, sorunun geçici çözümü için IIS restart hatta sunucunun restart edilmesi gerekebilecektir. İşte bu ve bunun gibi istenmiyen durumların olmaması için var olan sistemden bağımsız çalışan bir Queue mekanizmasının kullanılması güzel çözüm yöntemlerden birisi olabilir.
Bu makalede queue olarak RabbitMQ kullanılmıştır. İstenir ise Kafka, MSMQ gibi farklı Queue uygulamaları da kullanılabilir. Gelin isterseniz öncelikle DAL diye geçen uygulamanın data katmanını kodlayalım :) Database adı “Message” tablo adı “MessageLog” aşağıda görüldüğü gibidir.
Şimdi sıra geldi RabbitMQ projemizi oluşturmaya. Ben bunun için bir Windows Service oluşturucağım. RabbitMQ’nun çalışması için makinanızda önceden yukarıda görülen RabbitMQ servisinin ayağa kaldırılmış olması gerekmektedir. Bu anlamanın en iyi yolu “http://localhost:15672/” adresi browserdan çağrıldığı zaman aşağıdaki gibi bir ekranın gelmesi gerekmektedir.
Şimdi sıra geldi “MessageLogService” adında “RabbitMQ“‘daki mesajları dinleyip, “Message” DB’ye yazacak bir Windows Services’i yazmaya. Öncelikle aşağıdaki kütüphaneler NuGet Package Manager’dan Download Edilir:
Ben performansı mümkün olduğunca arttırmak için DB’ye data yazmak için Micro Orm toolarından Dapper‘ı kullanacağım. Dapper hakında bilgiyi bu makaleden edinebilirsiniz. http://www.borakasmer.com/dapper-nedir/ Ayrıca projede RabbitMQ kullanılabilmesi için RabbitMQ.Client’ın indirilmesi gerekmektedir.
Gelin öncelikle “MessageListener” adında bir sınıf yazalım.
MessageListener: Aşağıdaki codelara bakıldığında bir “DoWork()” methodu ve arkada çalışan bir “Thread()” ilk dikkati çeken öğelerdir. RabbitMQ için ConnectionFactory==> ile”localhost”‘a bağlanılıp, data çekilecek “MessageLog” channel’ının, Queue özellikleri tanımlandıktan sonra “while(true)” ile ilgili kanalı dinleme işine başlanır.
İlgili paket “MessageLog” channel’ından yakalandığı zaman, Deserialize edilip”MessageLog” modeline cast edilir. Daha sonra alınan model Dapper ile Sql’e bağlanılıp, yine Dapper Text Query ile Insert edilir. Ayrıca da Console’a bir log yazılır.
- “Connection ve channel” using ile oluşturulur.
- “channel.QueueDeclare” ile dinlenecek sıranın MessageLog olduğu, “queue: MessageLog” ile tanımlanır.
- “channel.BasicConsume” ile ilgili mesaj çekme işlemine başlanır.
- “consumer.Queue.Dequeue()” methodu ile ilgili queue’den sıradaki “MessageLog” alınmıştır.
- İlgili paket içeriği “message“‘a atanmış ve Newtonsoft.Json kullanılarak ilgili message “<MessageLog>” tipine Deserialize edilmiştir.
- Queue’den gelen pakete göre yeni bir “MessageLog” kaydı oluşturulur. Ve “log” değişkenine atanır.
- Dapper sayesinde “MessageLog” tablosuna kaydedilecek insert cümlesi oluşturulur. “Insert into [dbo].[MessageLog]([Text],[CreatedDate]) VALUES (@Text,@CreatedDate)“
- Son olarak ilgili query, yandaki gibi execute edilir. “sqlConnection.Execute(sqlQuery, log);“
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
using Dapper; using Newtonsoft.Json; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Configuration; using System.Data.SqlClient; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; namespace MessageLogService { class MessageListener { public void DoWork() { try { new Thread(() => { var factory = new ConnectionFactory() { HostName = "localhost" }; using (IConnection connection = factory.CreateConnection()) using (IModel channel = connection.CreateModel()) { channel.QueueDeclare(queue: "MessageLog", durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(queue: "MessageLog", noAck: true, consumer: consumer); while (true) { var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); var body = ea.Body; var message = Encoding.UTF8.GetString(body); MessageLog _messageLog = JsonConvert.DeserializeObject<MessageLog>(message); using (SqlConnection sqlConnection = new SqlConnection(ConfigurationManager.ConnectionStrings["Message"].ToString())) { byte[] data = Convert.FromBase64String(_messageLog.Text); string decodedString = Encoding.UTF8.GetString(data); sqlConnection.Open(); MessageLog log = new MessageLog() { Nick = _messageLog.Nick, Text = decodedString, CreatedDate = DateTime.Now }; string sqlQuery = "Insert into [dbo].[MessageLog]([Nick],[Text],[CreatedDate]) VALUES (@Nick,@Text,@CreatedDate)"; sqlConnection.Execute(sqlQuery, log); Console.WriteLine($" From: {_messageLog.Nick} Say:[{decodedString}]"); sqlConnection.Close(); } } } }).Start(); } catch (Exception ex) { string error = ex.Message; } } } } |
MessageLog.cs: RabbitMQ’den alınan kaydedilecek Message paket modeli aşağıdaki gibi tanımlanır.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace MessageLogService { public partial class MessageLog { public int ID { get; set; } public string Nick { get; set; } public string Text { get; set; } public DateTime? CreatedDate { get; set; } } } |
Windows Service Program.cs: Aşağıda görüldüğü gibi “Debug” ve “Release” durumlarında farklı code blokları çalışmaktadır. Release yani servis olarak çalışırken ilgili ServiceBase devreye girer ve Debug Model’da direk “DoWork()” methodu çağrılır.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
static class Program { /// <summary> /// The main entry point for the application. /// </summary> static void Main() { #if DEBUG var service = new MessageListener(); service.DoWork(); #else ServiceBase[] ServicesToRun; ServicesToRun = new ServiceBase[] { new MessageLogService() }; ServiceBase.Run(ServicesToRun); #endif } } |
App.config(connectionStrings):
1 2 3 4 |
<connectionStrings> <add name="Error" connectionString="Server=(local);Database=Message; Trusted_Connection=True;" providerName="System.Data.SqlClient" /> </connectionStrings> |
MessageLogService.cs: Aşağıda görüldüğü gibi ilgili servisin OnStart() durumunda “DoWork()” methodu çağrılmış ve RabbitMQ’daki “Message” kanalı dinlenmeye başlanmıştır.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
using System; using System.Collections.Generic; using System.ComponentModel; using System.Data; using System.Diagnostics; using System.Linq; using System.ServiceProcess; using System.Text; using System.Threading.Tasks; namespace MessageLogService { public partial class MessageLogService : ServiceBase { public MessageLogService() { InitializeComponent(); } protected override void OnStart(string[] args) { var service = new MessageListener(); service.DoWork(); } protected override void OnStop() { } } } |
MessageLog Windows Servis için ProjectInstaller Oluşturma:
MessageLogService sağ tıklanıp “Add Installer” seçilir. Projeye “ProjectInstaller” adında bir dosya eklenir. “serviceProcessInstaller1” ve “MessageLogService“‘in property ekranları aşağıdak gibidir.
“MessageLogService”‘in, install işleminden sonra Servisler kısmında aşağıdaki gibi gözükmesi gerekmektedir. Servise ait “Yetkili User”‘ı değiştirmeyi ve ve tabi ki start etmeyi unutmayın:)
Windows Service’nin yani MessageLogService’in Install edilmesi:
- Öncelikle .NetFrameWork’ün kurulu olduğu folder’a command prompt’dan yandaki gibi “Admin” yetkisi ile açılıp gelinir. Şu anki güncel versiyon. “C:\WINDOWS\Microsoft.NET\Framework\v4.0.30319\“. Sizde daha farklı olabilir
- InstallUtil.exe “C:\Users\Bora KASMER\Documents\Visual Studio 2017\Projects\SignalR_RabbitMQ\MessageLogService\bin\Release\MessageLogService.exe” komutu çalıştırılır.
- Önemli bir nokta”services.msc” komutu ile ilgili “MessageService”‘e gidilip sağ tıklanıp “Log On” sekmesindeki “This account” SqlServer’ınızdaki User ile aynı olmalıdır. Yoksa ilgili servis DB’ye yazma sırasında hata vermektedir.
İlgiliMessageService’in User’ı, sağ tıklanıp aşağıdaki gibi değiştirilir:
SqlDB’ye hangi user ile bağlanıldığı, property penceresinden aşağıdaki gibi görülmektedir:
Şimdi sıra geldi Yazışmalar sırasındaki tüm mesajları local’deki RabbitMQ’ya atmaya:
RabbitMQ Publisher: Burada “localhost“‘da çalışan ve credential istemeyen bir RabbitMQ servisine bağlanılmaktadır. Parametre olarak alınan “MessageLog” sınıfı Newtonsoft ile Serialized edilip byte’a çevrilir ve “BasicPublish()” methodu ile eklenecek olan “MessageLog” kanalına gönderilir.
Bu sırada QueueDeclare ayarları için:
- durable ile in-memory mi yoksa fiziksel olarak mı RabbitMQ daki verinin saklanacağı belirlenir.
- exclusive İlgili Queue’nin diğer connectionlar ile kullanılması izni belirlenir.
- autoDelete ile işlemi bitenQueue RabbitMq’dan silinir ya da istenir ise silinmez.
- arguments ile belirlenen exchanges ile alakalı parametrelerin tanımlandığı yerdir.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
using Newtonsoft.Json; using RabbitMQ.Client; using SignalR_RabbitMQ.Models; using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Web; namespace SignalR_RabbitMQ { public static class RabbitMQPublisher { public static void SendMessage(MessageLog log) { var factory = new ConnectionFactory() { HostName = "localhost" }; using (IConnection connection = factory.CreateConnection()) using (IModel channel = connection.CreateModel()) { channel.QueueDeclare(queue: "MessageLog", durable: false, exclusive: false, autoDelete: false, arguments: null); string _messageLog = JsonConvert.SerializeObject(log); var body = Encoding.UTF8.GetBytes(_messageLog); channel.BasicPublish(exchange: "", routingKey: "MessageLog", basicProperties: null, body: body); Console.WriteLine($" From: {log.Nick} [{log.Text}]"); } } } } |
Chat:Hub(Class) Son Hali:
- Aşağıda görüldüğü gibi herhangi bir client’ın gönderdiği mesaj, önce DB’ye yazılması için RabbitMQ’a atılır. Daha sonra da “Clienats.All.SendMessage()” ile de, connected olan tüm clientlardaki “SendMessagAll()” function’ı, yazılan Message ve UserName parametreleri ile trigger olunur. Böylece bir kişnin yazdığı mesaj, bağlı olan tüm clientlara gönderilmiş olunur.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public class Chat : Hub { public async override Task OnConnected() { await Clients.Caller.connected(Context.ConnectionId); } public async Task SendMessageToAll(string Message, string UserName) { //İlgili Mesaj Loglanır. RabbitMQPublisher.SendMessage(new MessageLog() { Nick = UserName, Text = Message, CreatedDate = DateTime.Now }); await Clients.All.SendMessageAll(Message, UserName); } } |
Başta da söylediğim gibi, bir sorunun tek bir çözüm yolu yoktur. Ya da tek başına muhteşem bir teknoloji de, nerde ise imkansızdır. Önemli olan amaca, ihtiyaca ve maddi imkanlara göre en iyi yapıyı oluşturmaktır. Amacın hız veya güvenlik olmasına göre seçilicek teknolojiler farklılık gösterebilecektir. Ama değişmeyen tek bir şey vardır. O da yumurtaların hepsinin tek bir sepete konmaması, ve dağıtık mimari ile çalışmanın getirdiği tonla avantajlardır. Tabi herşeyin fazlası zarardır. Çok fazla parçaya ayırmakta, yönetim maliyeti getirebilmektedir.
Geldik bir makalenin daha sonuna. Yeni bir makalede görüşmek üzere hoşçakalın.
Source: https://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html, https://stackoverflow.com/questions/52820548/correct-way-to-start-windows-service, https://docs.microsoft.com/tr-tr/aspnet/core/signalr/hubs?view=aspnetcore-2.2
Hocam ellerinize sağlık çok teşekkür ederim. Benim bir sorum olacaktı. Son kısımdaki SendMessageToAll method da RabbitMQ publish ettikten sonra db’ye yazarken hata oluştuğunu var sayalım. Bu durumda kullanıcıya geri nasıl mesaj döndürebiliriz?
Öncelikle Teşekkürler,
Soru çok güzel. Bu zaten gerçekte çalşan bir örnek. Sorduğunuz bu kısımı makaleye katmamıştım :) Hata durumu için de ayrı bir ErrorQuee var. catch{} durumunda ilgili Nick ve Text o quee’ya yazılıyor. Bir de yanına counter ekleniyor. Eğer 3 denemede kaydedilemezler ise artık denenmiyor. Bu durumda da ilgili kayıt admine mail atılıyor.
İyi çalışmalar.
Hocam projeyi indirdim ama çalışmıyor hata veriyor, dapper,owin vs.. gibi nugetlerin reference hatası veriyor
Sanırım ilgili paketleri, yeni versiyonları ile güncellemen gerekiyor.
Öncelikle çok teşekkürler makale için. Hocam en son MessageLogService\bin\Release\MessageLogService.exe command üzerinde çalıştırdımız da dosya olmadığı için sıkıntı yapıyor ne yapmamız gerekiyor bu durumda?
Selamlar Ahmet,
Uygulama macOS ortamında yazılmıştır. Doğal olarak bu exe olmaz. Önce projenin bir build edilmesi gerekiyor gibime geldi..
İyi çalışmalar.
Merhaba Bora Bey, paylaşmış olduğunuz source kodda consumer windows servicete ; QueingBasingConsumer kullanmışşınız ama rabbitmq de bu kaldırılmış sanırım.Sadece consumer olarak windows service uygulamanizi kullandım,bende ayni şekilde yapmıştım.Kuyruktan verileri cekip dogru bi sekilde eklettirilmiyor,eksik kaydediyor veri kaybi yaşıyorum.Bir senkronize sorunu yaşıyorum.Yardimci olabilir misiniz?
Selamlar,
Alınan hatayı belirtirseniz, ekran da olur, çok daha sağlıklı bir cevap verebilirim.
İyi çalışmalar.
Merhaba Bora Bey, buradan ekran fotoğraflarını paylaşamadığım için iletişim kısmında bulunan gmail adresinize mail atmış bulunmaktayım. Spam klasörüne düşmüş olabilir, kontol edebilir misiniz?