WebApi’de Channel Kullanımı
Selamlar,
Bu makalede, in-memory calışması yeterli olan yapılarda, WebApi servisleri arasında haberleşmeyi sağlayabilecek “Channellar” hakkında konuşacağız. Son günlerde çalıştığım bazı projelerde, InMemory çalışması yeterli olan yapılarda boş yere Kafka, RabbitMQ gibi mekanizmalarının kullanıldığını gördüm. Bunun üzerine de, bu makaleyi yazmaya karar verdim.
Bu makalede .Net 9 ile bir WebApi(WebApiChannel) projesi yaratıyoruz. Bir Producer, bir de Consumer Controller yazacağız. Bunlar birbirileri ile in_memory çalışan, singelton bir channel üzerinden haberleşecekler.
1-) Öncelikle gönderilecek Mesaj modelimiz Model folderı altında aşağıdaki gibi tanımlayalım.
Model/Message.cs:
1 2 3 4 5 |
public class Message { public int Id { get; set; } public string Text { get; set; } } |
2-) Ortak kullanılacak channel tanımlanır ve Dependency Injection ile projeye eklenir.
Service/ChannelService: Aşağıda görüldüğü gibi ilgili channel CreatUnbound() methodu ile sınırsız boyutta açılmıştır. OutofMemory durumunun olmaması için, güvenlik amaçlı istenir ise buna bir sınır da verilebilir. Ayrıca bir de alınan mesajların geçici bir memoryde depolanması ve bir ConsumerControllerdan okunması amacı ile, Thread safe bir ConcurrentQueu bu servis altında tanımlanmıştır. Sebeplerine makalenin devamında yer verilecektir.
1 2 3 4 5 6 7 8 9 10 |
using System.Threading.Channels; namespace WebApiChannel.Services { public class ChannelService { public readonly ConcurrentQueue _ConsumedMessage = new(); public Channel<string> _Channe{ get;} = Channel.CreateUnbounded<string>(); } } |
Şimdi sıra geldi heryerde kullanılacak olan bu ChannelServices’i, DependencyInjection ile Singleton olarak eklemeye.
Program.cs:
1 |
builder.Services.AddSingleton<ChannelService>(); |
3-) ProducerController’ı yaratıp, Post ile gönderilecek mesajı ilgili channel’a gönderelim.
Aşağıda görüldüğü gibi channelService ortak kullanım amacı ile, Dependency Injection ile constructor’dan alınmıştır. Gönderilecek mesaj serialize edilip, channel’a yazılmıştır. Ve geriye yazılan mesaj dönülmüştür.
Controllers/ProducerController.cs:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
using Microsoft.AspNetCore.Mvc; using System.Text.Json; using WebApiChannel.Models; using WebApiChannel.Services; namespace WebApiChannel.Controllers { [ApiController] [Route("[controller]")] public class ProducerController : ControllerBase { private readonly ChannelService _channelService; public ProducerController(ChannelService channelService) { _channelService = channelService; } [HttpPost(Name = "PostData")] public async Task<IActionResult> Post([FromBody] Message _message) { var jsonData = JsonSerializer.Serialize(_message); await _channelService._Channel.Writer.WriteAsync(jsonData); return Ok(new { message = "Data added to channel", _message }); } } } |
4-) İlgili channel’ı dinleyen ve gelen mesajı alan bir ConsumerService yazalım.
Aşağıda görüldüğü gibi ilgili channelService, Dependency Injection ile alınmış ve alınan mesajların geçici bir memoryde saklanması için bu serviste tanımlanan Thread safe ConcurrentQueue kullanılmıştır. Bunun sebepleri:
- Mesajın işlenmesi vakit alacak ise ve kaybolması istenmiyor ise, özellikle yoğun yük altında veriler, bu Queue’da birike bilirler. Consumer ne zaman müsait olur ise, bu queuedan sıradaki kaydı alıp, tüm kayıtlar bitene kadar tek tek işleyecektir.
- Parallel çalışan, çoklu iş parçacıklı yapıları ile uyumlu MultiThread sistemler için, Concurrent DataStructurel bir yapı kulanılmıştır.
- Son olarak mesajın işlenmesinde başarısız bir durum olması durumunda, Retry mekanizmasının devreye girebilmesi için gene bu Concurrent Queue kullanılmıştır.
Background servislerin ExecuteAsync methodu override edilip, global channels dinlemiş ve gelen mesaj önce “cosumedMessage” kuruğuna yazılmış, sonra da ekrana basılmıştır. Arkada sürekli çalışan ve proje ayağa kalktığı zaman onun da otomatik ayağa kalkması gereken bir servis olduğu için, BackgorundService olarak yaratılmıştır.
Service/ConsumerService.cs:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
using System.Collections.Concurrent; using System.Text.Json; using WebApiChannel.Models; namespace WebApiChannel.Services { public class ConsumerService:BackgroundService { private readonly ChannelService _channelService; public ConsumerService(ChannelService channelService) { _channelService = channelService; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await foreach (var json in _channelService._Channel.Reader.ReadAllAsync(stoppingToken)) { var data = JsonSerializer.Deserialize<Message>(json); _channelService._ConsumedMessage.Enqueue(data); Console.WriteLine($"Consumed: Id={data.Id}, Message={data.Text}"); } } } } |
Oluşturulan bu servis, program.cs’e aşağıdaki gibi “AddHostedService” olarak tanımlanmıştır. Bu sayede servisin doğrudan çalıştırılması sağlanmıştır. Kısaca IIS üzerinde application ayağa kalktığı an, ilgili channel bu servis tarafından sürekli dinlenmeye başlıyacaktır. Bu nedenle HostedService, ChannelService’den sonra tanımlanmalıdır.
program.cs:
1 2 |
builder.Services.AddSingleton<ChannelService>(); builder.Services.AddHostedService<ConsumerService>(); |
5-) Consumer Controllerin Yaratılması
Aşağıda görüldüğü gibi burda da aynı ProductControllerda olduğu gibi ChannelService, Dependency Injection ile çağrılmış ve ConcurrentQueue olarak ChannelServices’de tanımlanan “_ConsumedMessagelar” result olarak geri dönülmüştür.
Controllers/ConsumerController.cs:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
using Microsoft.AspNetCore.Mvc; using WebApiChannel.Services; namespace WebApiChannel.Controllers { [ApiController] [Route("[controller]")] public class ConsumerController : ControllerBase { private readonly ChannelService _channelService; public ConsumerController(ChannelService channelService) { _channelService = channelService; } [HttpPost(Name = "GetData")] public IActionResult Get() { return Ok(_channelService._ConsumedMessage); } } } |
6-) Son olarak .Net 9.0 ile yaratılan bir projede, artık default olarak swagger bulunmamaktadır. Bunun yerine OpenApi bulunmaktadır. Hadi gelin projemize Swagger desteği de sağlıyalım.
Scalar.AspNetCore projesi, aşağıdaki gibi Nuget’den indirilir.
program.cs: “app.MapScalarApiReference();” kodu eklenir.
1 2 3 4 5 |
if (app.Environment.IsDevelopment()) { app.MapOpenApi(); app.MapScalarApiReference(); } |
https://localhost:44395/scalar/v1
İlgili güncellemeler yapıldıktan sonra yukarıdaki adres çağrıldığında, aşağıdaki gibi bir sayfa ile karşılaşılır:
7-) Test Etme
Hadi gelin şimdi Producer’dan istediğimiz mesajları Channela atıp, Consumer’dan okunmasını sağlayalım .
Producer: Aşağıda görüldüğü gibi, Producer’dan 2 adet test mesajı başarılı bir şekilde atılmıştır.
Gönderilen 1. Mesaj:
Gönderilen 2. Mesaj:
Consumer: Aşağıda görüldüğü gibi Producer’dan atılan mesajlar, ilgili channel tarafından dinlenerek düzenli bir şekilde bir Queue’ya konmuş ve daha sonra ConsumerControllerdan çağrılan Get() methodu ile Queudaki tüm mesajlar, ekrana result olarak basılmıştır.
Bu makalede, in-memory çalışabilen yani sunucu kapandığı zaman verilerin kaybolması ihtimali kabul edilebilen sistemlerde channel yöntemi ile Producer & Consumer ilişkisinin nasıl olabileceğini örneklendirdik. Bu sistemde Channe’ınl singelton olarak yönetildiği, Producer Apinin farklı Actionlar ile veri ekleyebildiği, Consumer servisin arka planda çalışarak veriyi tükettiği bir yapıyı gördük. Bu sistem yüksek performans ile, asenkron iş kuyruğundaki herbir elemanı hızlıca işler.
Eğer verilerin sunucunun kapanması durumunda kaybolması ihtimaline toleransınız yok ise, bu durumda verileri kalıcı disk veya database gibi bir yere yazmanız gerekmektedir. Bu durumda Consumer ilk açıldığında, iönce veritabanında ya da diskteki verileri okur ve işler, sonradan channel’ı tüketmeye devam eder. Bunun için Sql Server, Redis, Kafka, RabbitMQ gibi kalıcı mesaj kuyrukları kullanılabilir.
Ayrıca BackgroundServis kullanımında esas amaç, zamanlanmış yani Scheduled edilmiş işlerin kolayca yönetilebilmesidir. Ayrıca istenir ise, gerçek zamanlı (Streaming, WebSocket, SignalR) gibi verileri de bu sistem ile işleyebiliriz. Farklı kaynaklardan beslenebilen bir kuyruk yapısı ile, örneğin Iot cihazlardan, dosya izleme veya loging gibi.. toplanan bu dataların asenkron olarak arkada çalışan background bir servis ile işlenmesini sağlayabiliriz.
Geldik bir makalenin daha sonuna. Yeni bir makalede görüşmek üzere hepinize hoşçakalın.
Github: https://github.com/borakasmer/ChannelApi
Selamlar,
Öncelikle emeğine sağlık Bora abi. Makaleyi yeni okumaya başlarken sunucunun restart edilmesi, uygulamanın durması durumu aklıma gelmişti ki yazının sonunda bunun tolere edilebilir olması gerektiğini yazmışsın. Burada tolere edilebilecek bir sistem aklıma gelmedi, channel sistemi belki de bu yüzden çok bilinmiyor olarak yorumladım.
Selamlar Rezan,
Bu konuda cok soru geldi. Bir sonraki maakele olarak, “Sunucu Resetlendigi Zaman Channel Kullanan Yapılar Nasıl Kaldığı Yerden Devam Edebilir ?” adli makaleyi yazacağım. Ordan konuyu takip edebilirsin.
Umarim isine yarar.
İyi calismalar.
Hoscakal.
Merhaba hocam masstransit in-memory producer-consumer mimarisi de kullanılabilir.
Selamlar Omer,
Aynen kullanilabilir, Redis Pub-Sub da kullanilabilir.
İyi calislamar.
Bora hocam elinize saglik. Peki hangi durunlarda inmemory hangi durumlarda rabbitmq kullanmaliyiz. Kuyruk sistemi secimini nasil yapmaliyiz? Teşekkürler.