WebApi’de Channel Kullanımı

Selamlar,

Bu makalede, in-memory calışması yeterli olan yapılarda, WebApi servisleri arasında haberleşmeyi sağlayabilecek “Channellar” hakkında konuşacağız. Son günlerde çalıştığım bazı projelerde, InMemory çalışması yeterli olan yapılarda boş yere Kafka, RabbitMQ gibi mekanizmalarının kullanıldığını gördüm. Bunun üzerine de, bu makaleyi yazmaya karar verdim.

Bu makalede .Net 9 ile bir WebApi(WebApiChannel) projesi yaratıyoruz. Bir Producer, bir de Consumer Controller yazacağız. Bunlar birbirileri ile in_memory çalışan, singelton bir channel üzerinden haberleşecekler.

1-) Öncelikle gönderilecek Mesaj modelimiz Model folderı altında aşağıdaki gibi tanımlayalım.

Model/Message.cs:

2-) Ortak kullanılacak channel tanımlanır ve Dependency Injection ile projeye eklenir.

Service/ChannelService: Aşağıda görüldüğü gibi  ilgili channel CreatUnbound() methodu ile sınırsız boyutta açılmıştır. OutofMemory durumunun olmaması için, güvenlik amaçlı istenir ise buna bir sınır da verilebilir. Ayrıca bir de alınan mesajların geçici bir memoryde depolanması ve bir ConsumerControllerdan okunması amacı ile, Thread safe bir ConcurrentQueu bu servis altında tanımlanmıştır. Sebeplerine makalenin devamında yer verilecektir.

Şimdi sıra geldi heryerde kullanılacak olan bu ChannelServices’i, DependencyInjection ile Singleton olarak eklemeye.

Program.cs:

3-) ProducerController’ı yaratıp, Post ile gönderilecek mesajı ilgili channel’a gönderelim.

Aşağıda görüldüğü gibi channelService ortak kullanım amacı ile, Dependency Injection ile constructor’dan alınmıştır. Gönderilecek mesaj serialize edilip, channel’a yazılmıştır. Ve geriye yazılan mesaj dönülmüştür.

Controllers/ProducerController.cs:

4-) İlgili channel’ı dinleyen ve gelen mesajı alan bir ConsumerService yazalım.

Aşağıda görüldüğü gibi ilgili channelService, Dependency Injection ile alınmış ve alınan mesajların geçici bir memoryde saklanması için bu serviste tanımlanan Thread safe ConcurrentQueue kullanılmıştır. Bunun sebepleri:

  1. Mesajın işlenmesi vakit alacak ise ve kaybolması istenmiyor ise, özellikle yoğun yük altında veriler, bu Queue’da birike bilirler. Consumer ne zaman müsait olur ise, bu queuedan sıradaki kaydı alıp, tüm kayıtlar bitene kadar tek tek işleyecektir.
  2. Parallel çalışan, çoklu iş parçacıklı yapıları ile uyumlu MultiThread sistemler için, Concurrent DataStructurel bir yapı kulanılmıştır.
  3. Son olarak mesajın işlenmesinde başarısız bir durum olması durumunda, Retry mekanizmasının devreye girebilmesi için gene bu Concurrent Queue kullanılmıştır.

Background servislerin ExecuteAsync methodu override edilip, global channels dinlemiş ve gelen mesaj önce “cosumedMessage” kuruğuna yazılmış, sonra da ekrana basılmıştır. Arkada sürekli çalışan ve proje ayağa kalktığı zaman onun da otomatik ayağa kalkması gereken bir servis olduğu için, BackgorundService olarak yaratılmıştır.

Service/ConsumerService.cs:

Oluşturulan bu servis, program.cs’e aşağıdaki gibi “AddHostedService” olarak tanımlanmıştır. Bu sayede servisin doğrudan çalıştırılması sağlanmıştır. Kısaca IIS üzerinde application ayağa kalktığı an, ilgili channel bu servis tarafından sürekli dinlenmeye başlıyacaktır. Bu nedenle HostedService, ChannelService’den sonra tanımlanmalıdır.

program.cs:

5-) Consumer Controllerin Yaratılması

Aşağıda görüldüğü gibi burda da aynı ProductControllerda olduğu gibi ChannelService, Dependency Injection ile çağrılmış ve ConcurrentQueue olarak ChannelServices’de tanımlanan “_ConsumedMessagelar” result olarak geri dönülmüştür.

Controllers/ConsumerController.cs:


6-) Son olarak .Net 9.0 ile yaratılan bir projede, artık default olarak swagger bulunmamaktadır. Bunun yerine OpenApi bulunmaktadır. Hadi gelin projemize Swagger desteği de sağlıyalım.

Scalar.AspNetCore projesi, aşağıdaki gibi Nuget’den indirilir.

program.cs: “app.MapScalarApiReference();” kodu eklenir.

https://localhost:44395/scalar/v1

İlgili güncellemeler yapıldıktan sonra yukarıdaki adres çağrıldığında, aşağıdaki gibi bir sayfa ile karşılaşılır:

7-) Test Etme

Hadi gelin şimdi Producer’dan istediğimiz mesajları Channela atıp, Consumer’dan okunmasını sağlayalım .

Producer: Aşağıda görüldüğü gibi, Producer’dan 2 adet test mesajı başarılı bir şekilde atılmıştır.

Gönderilen 1. Mesaj:

Gönderilen 2. Mesaj:

Consumer: Aşağıda görüldüğü gibi Producer’dan atılan mesajlar, ilgili channel tarafından dinlenerek düzenli bir şekilde bir Queue’ya konmuş ve daha sonra ConsumerControllerdan çağrılan Get() methodu ile Queudaki tüm mesajlar, ekrana result olarak basılmıştır.

Bu makalede, in-memory çalışabilen yani sunucu kapandığı zaman verilerin kaybolması ihtimali kabul edilebilen sistemlerde channel yöntemi ile Producer & Consumer ilişkisinin nasıl olabileceğini örneklendirdik. Bu sistemde Channe’ınl singelton olarak yönetildiği, Producer Apinin farklı Actionlar ile veri ekleyebildiği, Consumer servisin arka planda çalışarak veriyi tükettiği bir yapıyı gördük. Bu sistem yüksek performans ile, asenkron iş kuyruğundaki herbir elemanı hızlıca işler.

Eğer verilerin sunucunun kapanması durumunda kaybolması ihtimaline toleransınız yok ise, bu durumda verileri kalıcı disk veya database gibi bir yere yazmanız gerekmektedir. Bu durumda Consumer ilk açıldığında, iönce veritabanında ya da diskteki verileri okur ve işler, sonradan channel’ı tüketmeye devam eder. Bunun için Sql Server, Redis, Kafka, RabbitMQ gibi kalıcı mesaj kuyrukları kullanılabilir.

Ayrıca BackgroundServis kullanımında esas amaç, zamanlanmış yani Scheduled edilmiş işlerin kolayca yönetilebilmesidir. Ayrıca istenir ise, gerçek zamanlı (Streaming, WebSocket, SignalR) gibi verileri de bu sistem ile işleyebiliriz. Farklı kaynaklardan beslenebilen bir kuyruk yapısı ile, örneğin Iot cihazlardan, dosya izleme veya loging gibi.. toplanan bu dataların asenkron olarak arkada çalışan background bir servis ile işlenmesini sağlayabiliriz.

Geldik bir makalenin daha sonuna. Yeni bir makalede görüşmek üzere hepinize hoşçakalın.

Github: https://github.com/borakasmer/ChannelApi

 

 

Herkes Görsün:

Bunlar da hoşunuza gidebilir...

5 Cevaplar

  1. Rezan Yıldız dedi ki:

    Selamlar,
    Öncelikle emeğine sağlık Bora abi. Makaleyi yeni okumaya başlarken sunucunun restart edilmesi, uygulamanın durması durumu aklıma gelmişti ki yazının sonunda bunun tolere edilebilir olması gerektiğini yazmışsın. Burada tolere edilebilecek bir sistem aklıma gelmedi, channel sistemi belki de bu yüzden çok bilinmiyor olarak yorumladım.

    • borsoft dedi ki:

      Selamlar Rezan,
      Bu konuda cok soru geldi. Bir sonraki maakele olarak, “Sunucu Resetlendigi Zaman Channel Kullanan Yapılar Nasıl Kaldığı Yerden Devam Edebilir ?” adli makaleyi yazacağım. Ordan konuyu takip edebilirsin.
      Umarim isine yarar.

      İyi calismalar.
      Hoscakal.

  2. Ömer dedi ki:

    Merhaba hocam masstransit in-memory producer-consumer mimarisi de kullanılabilir.

  3. Egemen dedi ki:

    Bora hocam elinize saglik. Peki hangi durunlarda inmemory hangi durumlarda rabbitmq kullanmaliyiz. Kuyruk sistemi secimini nasil yapmaliyiz? Teşekkürler.

Bir cevap yazın

E-posta hesabınız yayımlanmayacak.