Sunucu Resetlendigi Zaman Channel Kullanan Yapılar Nasıl Kaldığı Yerden Devam Edebilir ?

Selamlar,

Bu makalede, bir önceki makalede anlatılan WebApi’de Channel Kullanımı konusunun devamı olarak in-memory olarak çalışan channel yapılarında ilgili sunucunun kapanması ya da channeldan alınan paketlerin yapılan işlemdeki kesintilerden dolayı işlenememesi durumunda, nasıl paketlerin saklanacağını ve sistemin düzelmesi ya da suncunun ayağa kalkması durumlarında, gönderilmiş paketlerde bir kayıp yaşanmadan nasıl kalındığı yerden devam edebileceği anlatılacaktır.

Bu makalede 3 farklı senaryoyu konuşacağız.

  • Birincisi, her gelen mesajın işlenmesinin 5sn sürdüğü ve anlık yüksek trafiğe sahip sistemlerde kuyrağa alınmış onlarca mesajın, sunucun kapanıp açılması durumunda nasıl kaldığı yerden devam edebileceğini.
  • İkincisi, Consumer olarak çalışan ve gelen mesajların bir MSSql DB’ye yazıldığı yapılarda, SQL sunucuya erişilemez ve mesaj gelmeye devam eder ise, alınan mesajların kaybolmadan SQL sunucu gelinceye kadar nasıl saklanıp DB sunucu ayağa kalkdığı zaman da işlenebileceğini tartışacağız.
  • Son olarak anlık kesintinin uzun sürdüğü yapılarda biriken binlerce mesajın nasıl arkada eritebileceğini bu sırada sunucunun bu işlemden bağımsız nasıl hızlaca ayağa kalkabileceğini ve yeni mesaj gönderilmesi durumunda, arkada önceden gelmiş mesajlar eritilirken paralelinde nasıl bu yeni gelen mesajın bekletilmeden işlenebileceğini inceleyeceğiz.

Öncelikle gelin MessageConsumerServis’i, yani Consumer yaklaşımını biraz değiştirelim. İlk önceliğimiz, gelen mesajları bir MSSql DB’ye kaydetmek olsun. Bunun içinde gerekli ön hazırlıkları yapalım. Yani var olan bir DB’de yeni bir tablo oluşturup, .Net tarafında DAL projesi yaratıp, DB işlemlerini bu yeni DAL projesi üzerindeki Context üzerinden yürütürken, ilgili kayıtları Message Entity üzeriden yönetelim. Ayrıca DAL katmanını, DB first yaklaşımı ile oluşturalım.

1-) Yeni bir SQL Tablo ve Yeni bir DAL Projesi oluşturalım:

Bu örnekte DBFirst yaklaşımı uygulanmıştır. Aşağıdaki SQL script ile “Message” Tablosu Customer MSSql DB’sinde, aşağıdaki gibi oluşturulmuştur.

Yeni bir “WebApiChannelDAL” adında Class Library projesi yaratılıp, Nuget’den, aşağıdaki kütüphaneler indirilir.

Terminalden DAL Proje altına gelinip, aşağıdaki scaffold komut yazılarak ilgili CustomerContext ve Message Entitiy’nin oluşması sağlanır.

Not: Makinanızda Scaffold komutu yüklü değil ise, aşağıdaki komut ile makinanıza indirebilirsiniz.

appsettings.json: Database connection string, aşağıdaki gibi tanımlanır.

program.cs: Son olarak CustomerContext, projeye Dependency Injection ile aşağıdaki gibi eklenir.

2-) ConsumerService’e Gelen Mesajların, Sunucu Kapandığı Veya Kayıtların İşlenemediği Zaman Kaybolmıyacağı Hale Getirilmesi.

how to save Arduino Serial data to TXT file

Öncelikle sunucu kapandığı zaman gönderilen mesajların kaybolmaması içi bunları bir yere mesela bu örnekte “txt” dosyaya yazılması lazım. Benim “txt” dosya seçme sebebim, minimum bağımlılık, basit ve performanslı bir çözüm olarak görmemdi. Siz isterseniz MongoDB, Redis, CosmosDB gibi write’ı hızlı in_memory ya da DocumentDB tarzı çözümlere gidebilirsiniz. Redis ve RabbitMQ gibi yapılar da, serverın kapanmasına karşılık çözüm olarak, aynı benim tercih ettiğim gibi fiziksel saklama yolu olarak bir dosyayı tercih etmektedir. Redis “dump.rd” dosyası üzerinde datayı ikili (binary) formatında tutar. RabbitMQ “*.rdq” uzantılı dosyalarda, kuyruk verilerini “/var/lib/rabbitmq/mnesia/” folderı altında tutar. Ben de bu örneklerden yola çıkarak, tüm gelen mesajları daha en başda, ‘C:\\Logs\\consumerData.txt‘ altında sunucuda fiziksel olarak tutmaya karar verdim.

Öncelikle gelin, static Filepath’i ve txt dosyaya yazan methodu aşağıdaki gibi tanımlayalım. Aşağıda dikkatinizi çekmek istediğim konu, ilk başta singleton olarak yazılan “_writer”‘ın, sonradan using ile “{ }” sarmalanmadan methoddan çıkarken dispose olmasının sağlanmasıdır. .Net’e gelen bu yeni özellik ile, using’i çıkışta ayrıca sarmalamaya gerek yoktur. Gelen mesaj, string json’a çevrilip “.txt” dosyaya yazılmaktadır.

ConsumerBGService / WriteToFileAsync(1):

ConsumerBGService / ExecuteAsync(2): Aşağıda görüldüğü gibi Channel’a mesaj geldiği zaman, her şeyden önce “.txt” file’a yazılmaktadır. Esas DB’ye yazılma işlemi, başka bir background serviste gerçekleşecektir. Burada amaç, gelen mesajın önce kaybolmaması için bir “.txt” dosyaya yazılması ve sonrasında da “_channelService” üzerindeki “_ConsumedMessage” ConcurrentQueue’ya ilgili mesajın koyulmasıdır. Bu şekilde başka bir background servis, ilgili Queue’yu dinleyerek alınan mesajları DB’ye yazacaktır.

ConsumerBGService / LoadPreviousMessagesAsync(3): Şimdi sıra geldi bu işlemin paralelinde, “.txt” file’a yazılmış ve herhangi bir nedenle işlenememiş, yani DB’ye yazılmamış kayıtların check edilip işlenmesine. SqlDB durmuş ya da sunucu restart olmuş olabilir, bu durumda da “.txt” dosyadaki mesajlar eritilmemiş olduğu için, bunların yeni gelen mesajların işlenmesi işlemine parallel olarak arkadan alınarak tekrar Queue’ya konması gerekmektedir.

Aşağıda görüldüğü gibi “.txt” file’a yazılmış tüm mesajlar, asenkron bir şekilde tek tek okunarak “_ConsumedMessage” queue’ya konmaktadır. Bu işlem 1 dakka ara ile arkada ve sürekli olarak, ilgili “.txt” file kontrol edilerek yapılmaktadır.

  1. Böylece makina ilk açıldığında işlenmeyip “.txt” file’a yazılan tüm mesajlar, bu method ile tek tek alınıp Concured Queue’ya konacaktır.
  2. Ayrıca Queuedan alınıp herhangi bir sebetden dolayı işlenemeyen kayıtlar, bu method içinde tekrardan “.txt” file’dan okunup işlenmesi için ilgili Queue’ya tekrardan konacaktır. Örnegin Sql sunucusuna erişilememesi ya da Conusmer servisdeki operasyonun çok uzun sürmesiniden dolayı alınan timeout hataları buna güncel hayatta hep karşılaştığımız örneklerdir.

3-) Ortak Bir Queuedan Alınan Mesajların, MsSql DB’ye Başarılı Bir Şekilde Kaydedildikten sonra “txt” File’dan Silinmesi

MessageConsumerBGService.cs / ExecuteAsync(1): Ortak “_channelService” üstündeki tüm mesajlar, sıra ile alınarak MSSql DB’ye sıra ile kaydedilirler. Tüm işlemler askenron olarak, bir background servis arkasında gerçekleşir.

  • _channelService: Projenin tamamında ortak kullanılan Channel ve Queue’nin tutulduğu servisdir.
  • _scopeFactory: CustomerContext’in her seferinde ayağa kalkmasını sağlar. DBContext nesnesini, singleton olarak DependencyInjection ile de alabilirdik. Ama o zaman DBcontext açık kalırsa bağlantı hataları yaşanabilir, aynı nesne birden fazla thread tarafından kullanılabilir ve son olarak servisin uzun süre çalışması durumunda eski DBContext nesneleri yönetilemez hale gelebilirdi.
    • scopeFactory Kulanılarak:
      •  Her çağrıda yeni bir DbContext örneği oluşturulur.
      • Connection life cycle düzgün yönetilir ve işlem tamamlandıktan sonra bağlantı kapatılır.
      • Tüm DB işlemleri Thread-safe olur, her işlem kendine özel bir DbContext nesnesi kullanır.
  • “while (!stoppingToken.IsCancellationRequested)”: Servisin while ile çalıştığı için, istendiği zaman durdurulması sağlanmıştır.
  • “await SaveMessageToDatabaseAsync(data,json)” Queue’dan alınan Message datası hem object hem de json text olarak DB’ye kaydeden Save methoduna parametre olarak gönderilmiştir.
  • “await Task.Delay(100, stoppingToken)”: Eğer Queue’da bir mesaj yok ise, CPU’yu yormamak adına küçük bir gecikme süresi tanımlanmıştır.

SaveMessageToDatabaseAsync(): Aşağıdaki Save methodunda CustomerContext scoped olarak bu method içinde kullanılmak üzere yaratılmış, ve DB’ye ilgili mesaj kaydedilince de silinmiştir.

Kaydetme işlemi sırasında gelen Message model tipi, bizim bir önceki makalede tanımladığımız Pocomuzdur. Bu modelin, DB’ye kaydedilmeden önce Entity Message tipine otomatik maplenmesi için AutoMapper kütüphanesi kullanılmıştır. DB’ye kaydetme işi tamamlandıktan sonra, ilgili mesaj kaydı Json formatı ile “.txt” file dosyasından bulunarak temizlenmiştir.

catch’e yani hataya düşülmesi durumunda, ilgili txt dosyadan kayıt çıkarılmayacağı için, arkada çalışan “ConsumerBGService / LoadPreviousMessagesAsync” Queueden çekilmiş olan message tekrar queue’ya konarak, ileride tekrar MessageConsumerBGService tarafından işlenmesine olanak sağlanmıştır.

Automapper işlemi için bir mappingProfile dosyası gerekmektedir. Aşağıdaki gibi düzenlenebilir.

MappingProfile.cs:

Ayrıca Automapper’in program.cs altında singleton olarak, aşağıdaki gibi tanımlanması gerekmketedir.

program.cs:

MessageConsumerBGService / RemoveFromFileAsync():

Aşağıda görüldüğü gibi DB’ye kaydedilen mesaj datasının Json hali hariç, “.txt” dosyadaki tüm kayıtlar tek tek gezilerek yeni bir temp dosyaya yazılır. Daha sonra eski dosya silinip, temp dosya eskisi yerine kaydedilir.

Aşağıdaki kodda büyük bir sorun görülüyor. MessageConsumerBGService’i txt dosyadan bir kayıt silerken, ConsumerBGService aynı dosyaya yazmaktadır. 2 farklı thread’in ortak bir kaynakta çalışması sorununa “Race Condition” denir ve Data Consistency bozulmasına neden olur.

Şimdi bu Data Consistency Bozan Kodu Düzeltelim:

Öncelikle bir dosyaya yazarken başka bir servisin aynı dosyaya yazmaması gerekiyor. Bu nedenle akla ilkin, Lock Objectler gelir. Bu Lock objesi, her yerden erişilebilir ortak bir sınıf üzerinde ve tek bir nesne olmalıdır.

ChannelService’e, aşağıdaki gibi SemaphoreSlim objesi eklenir. .Net 9.0 ile Lock sınıfı hayatımız dahil oldu ama maalesef Lock sınıfı, henüz asenkron yapılarda ve Threadlerde çalışmamaktadır. Bu nedenle, farklı Threadlerin ortak bir nesneye erişim problemi için SemaphoreSlim sınıfı kullanılmaktadır.

Buna bağlı olarak “.txt” dosyaya mesajları yazan “ConsumerBGService” sevise ait “WriteFileAsync()” methodu, aşağıdaki gibi güncellenmiştir. Dosyaya yazmadan önce başkasının erişimi engellenmiş, işlem bitince de ilgili lock yani engel kaldırılmıştır.

ConsumerBGService / WriteFileAsync():

Son olarak aşağıda görüldüğü gibi “MessageConsumerBGService”‘e ait “RemoveFromFileAsync()” methodunda, işlem yapılan mesajların dosyadan silinmesi anında dışarıdan ilgili dosyaya erişim kitlenmiş dosya üzerindeki işlem bitince de ilgili kilit kaldırılmıştır.

MessageConsumerBGService.cs / RemoveFromFileAsync():

Böylece uçtan uca in_memory olarak çalışan Channel yapısında server kapanıp açıldığı veya consumer sınıfında queuedan çekilen bir datanın işlenemediği bir durumda nasıl data kaybının önlenebileceğini dışarıdan ayrıca bir 3th party ürün kullanmadan gördük.

Geldik bir makalenin daha sonuna. Yeni bir makalede görüşmek üzere hepinize hoşçakalın. Daha detaylı bilgili için, aşağıdaki videoyu izleyebilirsiniz.

Github: https://github.com/borakasmer/WebApiChannelWorkingOffline

 

Herkes Görsün:

Bunlar da hoşunuza gidebilir...

Bir cevap yazın

E-posta hesabınız yayımlanmayacak.