Sunucu Resetlendigi Zaman Channel Kullanan Yapılar Nasıl Kaldığı Yerden Devam Edebilir ?
Selamlar,
Bu makalede, bir önceki makalede anlatılan WebApi’de Channel Kullanımı konusunun devamı olarak in-memory olarak çalışan channel yapılarında ilgili sunucunun kapanması ya da channeldan alınan paketlerin yapılan işlemdeki kesintilerden dolayı işlenememesi durumunda, nasıl paketlerin saklanacağını ve sistemin düzelmesi ya da suncunun ayağa kalkması durumlarında, gönderilmiş paketlerde bir kayıp yaşanmadan nasıl kalındığı yerden devam edebileceği anlatılacaktır.
Bu makalede 3 farklı senaryoyu konuşacağız.
- Birincisi, her gelen mesajın işlenmesinin 5sn sürdüğü ve anlık yüksek trafiğe sahip sistemlerde kuyrağa alınmış onlarca mesajın, sunucun kapanıp açılması durumunda nasıl kaldığı yerden devam edebileceğini.
- İkincisi, Consumer olarak çalışan ve gelen mesajların bir MSSql DB’ye yazıldığı yapılarda, SQL sunucuya erişilemez ve mesaj gelmeye devam eder ise, alınan mesajların kaybolmadan SQL sunucu gelinceye kadar nasıl saklanıp DB sunucu ayağa kalkdığı zaman da işlenebileceğini tartışacağız.
- Son olarak anlık kesintinin uzun sürdüğü yapılarda biriken binlerce mesajın nasıl arkada eritebileceğini bu sırada sunucunun bu işlemden bağımsız nasıl hızlaca ayağa kalkabileceğini ve yeni mesaj gönderilmesi durumunda, arkada önceden gelmiş mesajlar eritilirken paralelinde nasıl bu yeni gelen mesajın bekletilmeden işlenebileceğini inceleyeceğiz.
Öncelikle gelin MessageConsumerServis’i, yani Consumer yaklaşımını biraz değiştirelim. İlk önceliğimiz, gelen mesajları bir MSSql DB’ye kaydetmek olsun. Bunun içinde gerekli ön hazırlıkları yapalım. Yani var olan bir DB’de yeni bir tablo oluşturup, .Net tarafında DAL projesi yaratıp, DB işlemlerini bu yeni DAL projesi üzerindeki Context üzerinden yürütürken, ilgili kayıtları Message Entity üzeriden yönetelim. Ayrıca DAL katmanını, DB first yaklaşımı ile oluşturalım.
1-) Yeni bir SQL Tablo ve Yeni bir DAL Projesi oluşturalım:
Bu örnekte DBFirst yaklaşımı uygulanmıştır. Aşağıdaki SQL script ile “Message” Tablosu Customer MSSql DB’sinde, aşağıdaki gibi oluşturulmuştur.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
/* SQL SCRIPT --------------------------------------------------------------------- */ USE [Customer] GO SET ANSI_NULLS ON GO SET QUOTED_IDENTIFIER ON GO CREATE TABLE [dbo].[Message]( [MessageID] [int] IDENTITY(1,1) NOT NULL, [Id] [int] NOT NULL, [Text] [nvarchar](500) NOT NULL, [CreatedDate] [datetime] NOT NULL, PRIMARY KEY CLUSTERED ( [MessageID] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY] ) ON [PRIMARY] GOUSE [Customer] GO SET ANSI_NULLS ON GO SET QUOTED_IDENTIFIER ON GO CREATE TABLE [dbo].[Message]( [MessageID] [int] IDENTITY(1,1) NOT NULL, [Id] [int] NOT NULL, [Text] [nvarchar](500) NOT NULL, [CreatedDate] [datetime] NOT NULL, PRIMARY KEY CLUSTERED ( [MessageID] ASC )WITH (PAD_INDEX = OFF, STATISTICS_NORECOMPUTE = OFF, IGNORE_DUP_KEY = OFF, ALLOW_ROW_LOCKS = ON, ALLOW_PAGE_LOCKS = ON, OPTIMIZE_FOR_SEQUENTIAL_KEY = OFF) ON [PRIMARY] ) ON [PRIMARY] GO ALTER TABLE [dbo].[Message] ADD CONSTRAINT [DF_Message_CreatedDate] DEFAULT (getdate()) FOR [CreatedDate] GO |
Yeni bir “WebApiChannelDAL” adında Class Library projesi yaratılıp, Nuget’den, aşağıdaki kütüphaneler indirilir.
Terminalden DAL Proje altına gelinip, aşağıdaki scaffold komut yazılarak ilgili CustomerContext ve Message Entitiy’nin oluşması sağlanır.
1 |
dotnet ef dbcontext scaffold "Server=192.168.50.173;Database=Customer; User ID=sa; Password=******;encrypt=false" Microsoft.EntityFrameworkCore.SqlServer --output-dir Models/DB --table Message --context CustomerContext |
Not: Makinanızda Scaffold komutu yüklü değil ise, aşağıdaki komut ile makinanıza indirebilirsiniz.
1 |
Install-Package Microsoft.EntityFrameworkCore.Tools |
appsettings.json: Database connection string, aşağıdaki gibi tanımlanır.
1 2 3 |
"ConnectionStrings": { "DefaultConnection": "Server=192.168.50.173;Database=Customer; User ID=sa; Password=*****;encrypt=false" }, |
program.cs: Son olarak CustomerContext, projeye Dependency Injection ile aşağıdaki gibi eklenir.
1 2 3 |
// CustomerContext'i DI container'a ekliyoruz builder.Services.AddDbContext<CustomerContext>(options => options.UseSqlServer(builder.Configuration.GetConnectionString("DefaultConnection"))); |
2-) ConsumerService’e Gelen Mesajların, Sunucu Kapandığı Veya Kayıtların İşlenemediği Zaman Kaybolmıyacağı Hale Getirilmesi.
Öncelikle sunucu kapandığı zaman gönderilen mesajların kaybolmaması içi bunları bir yere mesela bu örnekte “txt” dosyaya yazılması lazım. Benim “txt” dosya seçme sebebim, minimum bağımlılık, basit ve performanslı bir çözüm olarak görmemdi. Siz isterseniz MongoDB, Redis, CosmosDB gibi write’ı hızlı in_memory ya da DocumentDB tarzı çözümlere gidebilirsiniz. Redis ve RabbitMQ gibi yapılar da, serverın kapanmasına karşılık çözüm olarak, aynı benim tercih ettiğim gibi fiziksel saklama yolu olarak bir dosyayı tercih etmektedir. Redis “dump.rd” dosyası üzerinde datayı ikili (binary) formatında tutar. RabbitMQ “*.rdq” uzantılı dosyalarda, kuyruk verilerini “/var/lib/rabbitmq/mnesia/” folderı altında tutar. Ben de bu örneklerden yola çıkarak, tüm gelen mesajları daha en başda, ‘C:\\Logs\\consumerData.txt‘ altında sunucuda fiziksel olarak tutmaya karar verdim.
Öncelikle gelin, static Filepath’i ve txt dosyaya yazan methodu aşağıdaki gibi tanımlayalım. Aşağıda dikkatinizi çekmek istediğim konu, ilk başta singleton olarak yazılan “_writer”‘ın, sonradan using ile “{ }” sarmalanmadan methoddan çıkarken dispose olmasının sağlanmasıdır. .Net’e gelen bu yeni özellik ile, using’i çıkışta ayrıca sarmalamaya gerek yoktur. Gelen mesaj, string json’a çevrilip “.txt” dosyaya yazılmaktadır.
ConsumerBGService / WriteToFileAsync(1):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
private const string FilePath = "C:\\Logs\\consumerData.txt"; private async Task WriteToFileAsync(string json) { try { //if (_writer == null) // _writer = new StreamWriter(FilePath, append: true); using StreamWriter _writer = new StreamWriter(FilePath, append: true); await _writer.WriteLineAsync(json); await _writer.FlushAsync(); } catch (Exception ex) { Console.WriteLine($"Error writing to file: {ex.Message}"); } } |
ConsumerBGService / ExecuteAsync(2): Aşağıda görüldüğü gibi Channel’a mesaj geldiği zaman, her şeyden önce “.txt” file’a yazılmaktadır. Esas DB’ye yazılma işlemi, başka bir background serviste gerçekleşecektir. Burada amaç, gelen mesajın önce kaybolmaması için bir “.txt” dosyaya yazılması ve sonrasında da “_channelService” üzerindeki “_ConsumedMessage” ConcurrentQueue’ya ilgili mesajın koyulmasıdır. Bu şekilde başka bir background servis, ilgili Queue’yu dinleyerek alınan mesajları DB’ye yazacaktır.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
protected override async Task ExecuteAsync(CancellationToken stoppingToken) { try { await foreach (var json in _channelService._Channel.Reader.ReadAllAsync(stoppingToken)) { stoppingToken.ThrowIfCancellationRequested(); try { var data = JsonSerializer.Deserialize<Message>(json); if (data != null) { await WriteToFileAsync(json); //Guvenlik amacli ilkin dosyaya yaz. _channelService._ConsumedMessage.Enqueue(data); Console.WriteLine($"Consumed: Id={data.Id}, Message={data.Text}"); } } catch (JsonException ex) { Console.WriteLine($"Invalid JSON format: {ex.Message}"); } catch (Exception ex) { Console.WriteLine($"Unexpected error processing message: {ex.Message}"); } } } catch (OperationCanceledException) { Console.WriteLine("ConsumerService is stopping."); } catch (Exception ex) { Console.WriteLine($"Critical error in ExecuteAsync: {ex.Message}"); } } |
ConsumerBGService / LoadPreviousMessagesAsync(3): Şimdi sıra geldi bu işlemin paralelinde, “.txt” file’a yazılmış ve herhangi bir nedenle işlenememiş, yani DB’ye yazılmamış kayıtların check edilip işlenmesine. SqlDB durmuş ya da sunucu restart olmuş olabilir, bu durumda da “.txt” dosyadaki mesajlar eritilmemiş olduğu için, bunların yeni gelen mesajların işlenmesi işlemine parallel olarak arkadan alınarak tekrar Queue’ya konması gerekmektedir.
Aşağıda görüldüğü gibi “.txt” file’a yazılmış tüm mesajlar, asenkron bir şekilde tek tek okunarak “_ConsumedMessage” queue’ya konmaktadır. Bu işlem 1 dakka ara ile arkada ve sürekli olarak, ilgili “.txt” file kontrol edilerek yapılmaktadır.
- Böylece makina ilk açıldığında işlenmeyip “.txt” file’a yazılan tüm mesajlar, bu method ile tek tek alınıp Concured Queue’ya konacaktır.
- Ayrıca Queuedan alınıp herhangi bir sebetden dolayı işlenemeyen kayıtlar, bu method içinde tekrardan “.txt” file’dan okunup işlenmesi için ilgili Queue’ya tekrardan konacaktır. Örnegin Sql sunucusuna erişilememesi ya da Conusmer servisdeki operasyonun çok uzun sürmesiniden dolayı alınan timeout hataları buna güncel hayatta hep karşılaştığımız örneklerdir.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
private async Task LoadPreviousMessagesAsync(CancellationToken stoppingToken) { try { while (!stoppingToken.IsCancellationRequested) // Amac islenemeyen mesajlari tekrar islemek { await Task.Delay(45000, stoppingToken); // Simulate delay if (!File.Exists(FilePath)) { await File.WriteAllTextAsync(FilePath, ""); // Create file if it doesn't exist } else { await foreach (var line in ReadLinesAsync(FilePath, stoppingToken)) { try { var data = JsonSerializer.Deserialize<Message>(line); if (data != null && !_channelService._ConsumedMessage.Any(m => m.CreatedDate == data.CreatedDate)) { _channelService._ConsumedMessage.Enqueue(data); } } catch (JsonException ex) { Console.WriteLine($"Skipping invalid JSON: {ex.Message}"); } } } await Task.Delay(TimeSpan.FromMinutes(1), stoppingToken); // 1 dakika bekle ve tekrar çalıştır } } catch (Exception ex) { Console.WriteLine($"Error reading from file: {ex.Message}"); } } |
3-) Ortak Bir Queuedan Alınan Mesajların, MsSql DB’ye Başarılı Bir Şekilde Kaydedildikten sonra “txt” File’dan Silinmesi
MessageConsumerBGService.cs / ExecuteAsync(1): Ortak “_channelService” üstündeki tüm mesajlar, sıra ile alınarak MSSql DB’ye sıra ile kaydedilirler. Tüm işlemler askenron olarak, bir background servis arkasında gerçekleşir.
- _channelService: Projenin tamamında ortak kullanılan Channel ve Queue’nin tutulduğu servisdir.
- _scopeFactory: CustomerContext’in her seferinde ayağa kalkmasını sağlar. DBContext nesnesini, singleton olarak DependencyInjection ile de alabilirdik. Ama o zaman DBcontext açık kalırsa bağlantı hataları yaşanabilir, aynı nesne birden fazla thread tarafından kullanılabilir ve son olarak servisin uzun süre çalışması durumunda eski DBContext nesneleri yönetilemez hale gelebilirdi.
- scopeFactory Kulanılarak:
- Her çağrıda yeni bir DbContext örneği oluşturulur.
- Connection life cycle düzgün yönetilir ve işlem tamamlandıktan sonra bağlantı kapatılır.
- Tüm DB işlemleri Thread-safe olur, her işlem kendine özel bir DbContext nesnesi kullanır.
- scopeFactory Kulanılarak:
- “while (!stoppingToken.IsCancellationRequested)”: Servisin while ile çalıştığı için, istendiği zaman durdurulması sağlanmıştır.
- “await SaveMessageToDatabaseAsync(data,json)” Queue’dan alınan Message datası hem object hem de json text olarak DB’ye kaydeden Save methoduna parametre olarak gönderilmiştir.
- “await Task.Delay(100, stoppingToken)”: Eğer Queue’da bir mesaj yok ise, CPU’yu yormamak adına küçük bir gecikme süresi tanımlanmıştır.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
public class MessageConsumerBGService : BackgroundService { private readonly ChannelService _channelService; private readonly IServiceScopeFactory _scopeFactory; private const string FilePath = "C:\\Logs\\consumerData.txt"; public MessageConsumerBGService(ChannelService channelService, IServiceScopeFactory scopeFactory) { _channelService = channelService; _scopeFactory = scopeFactory; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { try { while (!stoppingToken.IsCancellationRequested) { if (_channelService._ConsumedMessage.TryDequeue(out var data)) { try { var json = JsonSerializer.Serialize(data); await SaveMessageToDatabaseAsync(data,json); Console.WriteLine($"[Processed] Id={data.Id}, Message={data.Text}"); } catch (Exception ex) { Console.WriteLine($"Unexpected error while processing message: {ex.Message}"); } } else { await Task.Delay(100, stoppingToken); // Boşta beklerken CPU'yu yormamak için küçük bir gecikme } } } catch (OperationCanceledException) { Console.WriteLine("MessageConsumer is stopping."); } catch (Exception ex) { Console.WriteLine($"Critical error in ExecuteAsync: {ex.Message}"); } } |
SaveMessageToDatabaseAsync(): Aşağıdaki Save methodunda CustomerContext scoped olarak bu method içinde kullanılmak üzere yaratılmış, ve DB’ye ilgili mesaj kaydedilince de silinmiştir.
Kaydetme işlemi sırasında gelen Message model tipi, bizim bir önceki makalede tanımladığımız Pocomuzdur. Bu modelin, DB’ye kaydedilmeden önce Entity Message tipine otomatik maplenmesi için AutoMapper kütüphanesi kullanılmıştır. DB’ye kaydetme işi tamamlandıktan sonra, ilgili mesaj kaydı Json formatı ile “.txt” file dosyasından bulunarak temizlenmiştir.
catch’e yani hataya düşülmesi durumunda, ilgili txt dosyadan kayıt çıkarılmayacağı için, arkada çalışan “ConsumerBGService / LoadPreviousMessagesAsync” Queueden çekilmiş olan message tekrar queue’ya konarak, ileride tekrar MessageConsumerBGService tarafından işlenmesine olanak sağlanmıştır.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
private async Task SaveMessageToDatabaseAsync(Models.Message data, string? json) { try { using var scope = _scopeFactory.CreateScope(); var dbContext = scope.ServiceProvider.GetRequiredService<CustomerContext>(); var mapper = scope.ServiceProvider.GetRequiredService<IMapper>(); var dbMessage = mapper.Map<WebApiChannelDAL.Models.DB.Message>(data); dbContext.Messages.Add(dbMessage); await dbContext.SaveChangesAsync(); await RemoveFromFileAsync(json); } catch (Exception ex) { Console.WriteLine($"Database error: {ex.Message}"); } } |
Automapper işlemi için bir mappingProfile dosyası gerekmektedir. Aşağıdaki gibi düzenlenebilir.
MappingProfile.cs:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
using AutoMapper; namespace WebApiChannel { public class MappingProfile : Profile { public MappingProfile() { // Models.Message ile WebApiChannelDAL.Models.DB.Message arasında dönüşüm kuralı CreateMap<Models.Message, WebApiChannelDAL.Models.DB.Message>() .ForMember(dest => dest.MessageId, opt => opt.Ignore()); // MessageId IDENTITY olduğu için elle ayarlamıyoruz } } } |
Ayrıca Automapper’in program.cs altında singleton olarak, aşağıdaki gibi tanımlanması gerekmketedir.
program.cs:
1 2 3 4 5 6 7 |
//Automapper var mappingConfig = new MapperConfiguration(mc => { mc.AddProfile(new MappingProfile()); }); IMapper mapper = mappingConfig.CreateMapper(); builder.Services.AddSingleton(mapper); |
MessageConsumerBGService / RemoveFromFileAsync():
Aşağıda görüldüğü gibi DB’ye kaydedilen mesaj datasının Json hali hariç, “.txt” dosyadaki tüm kayıtlar tek tek gezilerek yeni bir temp dosyaya yazılır. Daha sonra eski dosya silinip, temp dosya eskisi yerine kaydedilir.
Aşağıdaki kodda büyük bir sorun görülüyor. MessageConsumerBGService’i txt dosyadan bir kayıt silerken, ConsumerBGService aynı dosyaya yazmaktadır. 2 farklı thread’in ortak bir kaynakta çalışması sorununa “Race Condition” denir ve Data Consistency bozulmasına neden olur.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
private async Task RemoveFromFileAsync(string json) { try { if (!File.Exists(FilePath)) return; string tempFilePath = FilePath + ".tmp"; using (var reader = new StreamReader(FilePath)) using (var writer = new StreamWriter(tempFilePath, false)) { string? line; while ((line = await reader.ReadLineAsync()) != null) { if (line != json) // Eşleşmeyenleri yeni dosyaya yaz await writer.WriteLineAsync(line); } } File.Delete(FilePath); // Eski dosyayı sil File.Move(tempFilePath, FilePath); // Yeni dosyayı eski dosyanın yerine koy } catch (Exception ex) { Console.WriteLine($"Error removing from file: {ex.Message}"); } } |
Şimdi bu Data Consistency Bozan Kodu Düzeltelim:
Öncelikle bir dosyaya yazarken başka bir servisin aynı dosyaya yazmaması gerekiyor. Bu nedenle akla ilkin, Lock Objectler gelir. Bu Lock objesi, her yerden erişilebilir ortak bir sınıf üzerinde ve tek bir nesne olmalıdır.
ChannelService’e, aşağıdaki gibi SemaphoreSlim objesi eklenir. .Net 9.0 ile Lock sınıfı hayatımız dahil oldu ama maalesef Lock sınıfı, henüz asenkron yapılarda ve Threadlerde çalışmamaktadır. Bu nedenle, farklı Threadlerin ortak bir nesneye erişim problemi için SemaphoreSlim sınıfı kullanılmaktadır.
1 2 3 4 5 6 |
public class ChannelService { public readonly ConcurrentQueue<Message> _ConsumedMessage = new(); public Channel<string> _Channel { get; } = Channel.CreateUnbounded<string>(); public readonly SemaphoreSlim FileLock = new(1, 1); } |
Buna bağlı olarak “.txt” dosyaya mesajları yazan “ConsumerBGService” sevise ait “WriteFileAsync()” methodu, aşağıdaki gibi güncellenmiştir. Dosyaya yazmadan önce başkasının erişimi engellenmiş, işlem bitince de ilgili lock yani engel kaldırılmıştır.
ConsumerBGService / WriteFileAsync():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
private async Task WriteToFileAsync(string json) { await _channelService.FileLock.WaitAsync(); try { //if (_writer == null) // _writer = new StreamWriter(FilePath, append: true); using StreamWriter _writer = new StreamWriter(FilePath, append: true); await _writer.WriteLineAsync(json); await _writer.FlushAsync(); } catch (Exception ex) { Console.WriteLine($"Error writing to file: {ex.Message}"); } finally { _channelService.FileLock.Release(); } } |
Son olarak aşağıda görüldüğü gibi “MessageConsumerBGService”‘e ait “RemoveFromFileAsync()” methodunda, işlem yapılan mesajların dosyadan silinmesi anında dışarıdan ilgili dosyaya erişim kitlenmiş dosya üzerindeki işlem bitince de ilgili kilit kaldırılmıştır.
MessageConsumerBGService.cs / RemoveFromFileAsync():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
private async Task RemoveFromFileAsync(string json) { await _channelService.FileLock.WaitAsync(); try { if (!File.Exists(FilePath)) return; string tempFilePath = FilePath + ".tmp"; using (var reader = new StreamReader(FilePath)) using (var writer = new StreamWriter(tempFilePath, false)) { string? line; while ((line = await reader.ReadLineAsync()) != null) { if (line != json) // Eşleşmeyenleri yeni dosyaya yaz await writer.WriteLineAsync(line); } } File.Delete(FilePath); // Eski dosyayı sil File.Move(tempFilePath, FilePath); // Yeni dosyayı eski dosyanın yerine koy } catch (Exception ex) { Console.WriteLine($"Error removing from file: {ex.Message}"); } finally { _channelService.FileLock.Release(); } } |
Böylece uçtan uca in_memory olarak çalışan Channel yapısında server kapanıp açıldığı veya consumer sınıfında queuedan çekilen bir datanın işlenemediği bir durumda nasıl data kaybının önlenebileceğini dışarıdan ayrıca bir 3th party ürün kullanmadan gördük.
Geldik bir makalenin daha sonuna. Yeni bir makalede görüşmek üzere hepinize hoşçakalın. Daha detaylı bilgili için, aşağıdaki videoyu izleyebilirsiniz.
Github: https://github.com/borakasmer/WebApiChannelWorkingOffline
Son Yorumlar