Redis Pub/Sub ile Console’da Chat ve ElasticSearch İle Loglama
Selamlar,
Bu yazıda, daha çok cache olarak kullanılan Redis üzerinde, klasik producer(üretici) – consumer(tüketici) senaryosu yerine, 2 tarafın da birbirini dinlediği ve cevapladığı bir chat uygulaması üzerinde konuşacağız. Aslında bir chat uygulaması için, 2 tarafın da hem publisher hem subscriber olması gerekmektedir. Bunun yanına, arada geçen konuşmaları ElasticSearch’de saklayıp, son 5 mesajı başlangıçta konsol üzerinde listeleyeceğiz. Bu yazıya devam etmeden önce, makinanızda Redis yok ise nasıl kurulur ve Redis Nedir gibi sorulara aşina değil iseniz, önce aşağıdaki makaleleri okumanızı tavsiye ederim:
Bu solution’da, 4 Proje oluşturacağız. İlki ClassLibrary RedisCore projesi, diğer 2 si de Console Application Redis Producer ve Redis Consumer projeleri ve son olarak ElasticCore projesi.
İlk projemiz RedisCore Projesinden başlayalım.
RedisCore/RedisCore.cs: Burada amaç, Redis sunucusuna bağlanacak ve Global olarak kullanılacak bir Redis Client’ın oluşturulmasıdır. Öncelikle Nuget’den, aşağıda görüldüğü gibi “StackExchange” ve “ConfigurationManager” kütüphaneleri indirilir.
- Memory’den tasarruf amaçlı, iş bitiminde GC’un doğrudan çağrılabilmesi için “Using()” içerisinde kullanılmıştır. Bu nedenle ilgili sınıf, IDisposable interface’inden türetilmiştir. Ve “Dispose()” methodu override edilmiştir.
- 2 farklı channel tanımlanmıştır “Joe” ve “Elie”. Chat’e katılacak herbir kişi için, ayrı bir channel tanımlanabilir.
- RedisClient() methodu ile, ilgili connectionString (sunucu yolu) ve Password ile Redis Server’a bağlanılır.
- İlgili RedisSubscriber GetSubscirber() methodu ile geri dönülür.
RedisCore/RedisCore.cs:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Configuration; using StackExchange.Redis; using System.Runtime.InteropServices; namespace RedisPubSubChat { public class RedisClient : IDisposable { public ConnectionMultiplexer connection; public string Channel = "Joe"; public string Channel2 = "Elie"; public RedisClient() { string connectionString = ConfigurationManager.AppSettings["RedisConnectionString"]; var options = ConfigurationOptions.Parse(connectionString); options.Password = ConfigurationManager.AppSettings["RedisPassword"]; connection = ConnectionMultiplexer.Connect(options); } private bool _disposedValue; ~RedisClient() => Dispose(false); // Public implementation of Dispose pattern callable by consumers. public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } // Protected implementation of Dispose pattern. protected virtual void Dispose(bool disposing) { if (!_disposedValue) { if (disposing) { // TODO: dispose managed state (managed objects) } // TODO: free unmanaged resources (unmanaged objects) and override finalizer // TODO: set large fields to null _disposedValue = true; } } public ISubscriber GetSubscriber() { return connection.GetSubscriber(); } } } |
RedisPubSub/App.conf: Redis ve ElasticSearch için gerekli Url, User ve Şifre bilgileri aşağıdaki gibi config dosyasında saklanır.
Not: “app.config”‘de saklanan kritik verilerin, güvenliği amacı ile Encrypted olmasına dikkat ediniz! Bu makalede, güvenlik konusu es geçilmiştir.
1 2 3 4 5 6 7 8 9 10 11 |
<?xml version="1.0" encoding="utf-8" ?> <configuration> <appSettings> <add key="RedisConnectionString" value="localhost:6379" /> <add key="RedisPassword" value="RedisPassword" /> <add key="ElasticSearchHost" value="http://localhost:9200" /> <add key="ElasticSearchUser" value="elastic" /> <add key="ElasticSearchPassword" value="E1asticPassword" /> <add key="ElasticIndexName" value="chatredis_log" /> </appSettings> </configuration> |
RedisPubSubChat/Program.cs: Bu console uygulamasının iki rolü vardır. Hem producer, hem de consumer. Kısaca hem mesaj alan, hem de mesaj yollayan mekanikleri tanımlayacağız.
- RedisCore sınıfından redis subscriber alınır, ve sonsuz “while” döngüsü içerisinde ilgili channel dinlemeye başlanılır.
- İşte konsol uygulamamızın ilk Rolü Subscriber. “SubscriberAsync()” ile Channel2 (“Elie”) sürekli olarak dinlenir. Eğer yeni bir mesaj gelir ise Console’a yazılır..
- Eğer ilgili channel’a bir mesaj gelir ise gelen mesajın hemen altına yeni bir mesaj girilecek olan alan açılır. İşte Console uygulamamızın 2. Rolü Publisher. “PublishAsync()” ile Channel(“Joe”)’a istenen mesaj atılır. CommandFlags.FireAndForget ile, işlemin başarılı olup olunmadığına bakılmaksızın kod devam ettirilir.
- Subscriber’ın dışındaki mesaj yazma özelliği, gene bir Publisher Rolüdür. Burada amaç yukarıdakinden farklı olarak, sayfa ilk açıldığında da mesaj yazabilmemizi sağlar. Yukarıdaki durumda ise, mesaj yazma özelliği yalnızca size bir mesaj geldiği zaman olabilmektedir.
RedisPubSubChat/Program.cs: Kısaca Channel(“Joe”) kanalına yazılıp, Channel2(“Elie”) kanalı aynı zamanda dinlenir. Böylece çift taraflı görev tanımı ile Console Mesaj ekranı yapılmış olunur.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
using ElasticCore; using System; using System.Configuration; using System.Threading.Tasks; namespace RedisPubSubChat { internal class Program { static async Task Main(string[] args) { using (RedisClient client = new RedisClient()) { var pubSub = client.GetSubscriber(); bool isStay = true; while (isStay) { //Subscriber await pubSub.SubscribeAsync(client.Channel2, (cannel, message) => { Console.WriteLine(Environment.NewLine + "Elie: " + message); //Publisher Console.Write("Write Message : "); var message2 = Console.ReadLine(); isStay = message2.ToLower() != "exit" ? true : false; pubSub.PublishAsync(client.Channel, message2, StackExchange.Redis.CommandFlags.FireAndForget); }); //Publisher Console.Write("Write Message : "); var message = Console.ReadLine(); isStay = message.ToLower() != "exit" ? true : false; await pubSub.PublishAsync(client.Channel, message, StackExchange.Redis.CommandFlags.FireAndForget); } } } } } |
RedisSubscriber/program.cs: Bu Console uygulaması, 2. client için oluşturulmuştur. Yukarıdaki uygulamanın eşleniği yanlızca bu sefer dinlenen kanal Channel(“Joe”), yazılan kanal Channel2(“Elie”)’dir. Yani tam tersi bir durum söz konusudur.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
using ElasticCore; using RedisPubSubChat; using System; using System.Configuration; using System.Threading.Tasks; namespace RedisSubscriber { internal class Program { static async Task Main(string[] args) { using (RedisClient client = new RedisClient()) using (ElasticCoreService<ChatModel> elastic = new ElasticCoreService<ChatModel>()) { //Get Chat History From Elastic var chats = elastic.SearchChatLog(5); Console.WriteLine("TOP 5 Message History:"); Console.WriteLine("".PadRight(60, '*')); foreach (var chat in chats) { Console.WriteLine($"-{chat.From}({chat.PostDate}): {chat.Message}"); } Console.WriteLine("".PadRight(60, '*')); Console.WriteLine(); //------------------------------ var pubSub = client.GetSubscriber(); bool isStay = true; while (isStay) { await pubSub.SubscribeAsync(client.Channel, (cannel, message) => { Console.WriteLine(Environment.NewLine + "Joe: " + message); Console.Write("Write Message : "); var message2 = Console.ReadLine(); isStay = message2.ToLower() != "exit" ? true : false; pubSub.PublishAsync(client.Channel2, message2, StackExchange.Redis.CommandFlags.FireAndForget); ChatModel chatModel = new ChatModel() { From = "Joe", To = "Elie", Message = message2, PostDate = DateTime.Now }; elastic.CheckExistsAndInsertLog(chatModel, ConfigurationManager.AppSettings["ElasticIndexName"]); }); Console.Write("Write Message : "); var message = Console.ReadLine(); isStay = message.ToLower() != "exit" ? true : false; await pubSub.PublishAsync(client.Channel2, message, StackExchange.Redis.CommandFlags.FireAndForget); ChatModel chatModel = new ChatModel() { From = "Joe", To = "Elie", Message = message, PostDate = DateTime.Now }; elastic.CheckExistsAndInsertLog(chatModel, ConfigurationManager.AppSettings["ElasticIndexName"]); } } } } } |
ElasticSearch ile Mesajları Loglama Ve Listeleme:
Öncelikle Elasticsearch hakkında pek bir fikriniz yok ise aşağıdaki makalelere bir bakmanızı tavsiye ediyorum:
- https://www.borakasmer.com/net-coreda-elasticsearch/
- https://www.borakasmer.com/elasticsearch-ile-farkli-indexler-ile-net-coreda-loglama/
- https://www.borakasmer.com/redis-elasticsearch-ve-kibananin-windows-uzerinde-kurulum-uzaktan-erisim-ve-net-ile-guvenligi/
- https://www.borakasmer.com/net-core-uzerinde-config-redis-ve-elasticsearch-icin-guvenlik/
Solution üzerinde, ElasticCore adında yeni bir Class Library Proje aşağıdaki gibi yaratılır.
ElasticCore/IElasticCoreService: Öncelikle, elastic ile yapılacak işlerin bir listesinin çıkarılması gerekmektedir. Aşağıdaki interface’e göre <T> tipi ile verilecek herhangi bir Index ya sorgulanıcak “SearchChatLog()“, ya da yok ise ilgili Index yaratılıp ilgili documentler kaydedilecektir. “CheckExistsAndInsertLog()”
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace ElasticCore { public interface IElasticCoreService<T> where T : class { public IReadOnlyCollection<ChatModel> SearchChatLog(int rowCount); public void CheckExistsAndInsertLog(T logMode, string indexName); } } |
ElasticCore/ChatModel: Clientlar arası geçecek konuşmaların kaydedileceği model, aşağıda görüldüğü gibi “ChatModel”‘dir.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; namespace ElasticCore { public class ChatModel { public string From { get; set; } public string To { get; set; } public string Message { get; set; } public DateTime PostDate { get; set; } } } |
ElasticCore/ElasticClientProvider.cs:
- Öncelikle ElasticSearch için Nest kütüphanesi ve ilgili configlerin app.config’den okunması için, ConfigurationManager kütüphaneleri Nuget’den aşağıdaki gibi indirilir.
- ElasticClient değişkenine, app.config’den alınan elastic host dosyası ile oluşurulan elastic client atanır.
- BasicAuthentication ile ElasticSearch’e tanımlanmış user ve password ile bağlanılır.
- DisablePing(): İlk request’den sonra, belirlenen standart sürenin üstünde bir sürede hata fırlatılması sağlanır.
- DisableDirectStreaming() Bu, Elastic’e request ve response’un ara belleğe alınmasını sağlar ve her iki değerin de sırasıyla “RequestBodyInBytes” ve “ResponseBodyInBytes” propertylerinde çağrılabilmesine imkan verir. Bunu elasticsearch’de hata alındığı zaman, daha detaylı hatayı alabilmek adına eklenmiştir. Memoryde performans kaybına neden olabilir. Sadece ihtiyaç anında kullanılmalıdır.
- SniffOnStartup(false): İlk connection’ın çekilme anında, havuzun kontrol edilmesini engeller. Amaç performanstır.
- SniffOnConnectionFault(false): Bağlantı havuzu yeniden beslemeyi destekliyorsa, bir arama başarısız olduğunda ilgili connection havuzundan yeniden denetlenmesini engeller. Amaç yine performanstır.
ElasticCore/ElasticClientProvider.cs:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 |
using Nest; using System; using System.Collections.Generic; using System.Configuration; using System.Linq; using System.Text; using System.Threading.Tasks; namespace ElasticCore { public class ElasticClientProvider:IDisposable { public ElasticClientProvider() { ElasticSearchHost = ConfigurationManager.AppSettings["ElasticSearchHost"]; ElasticClient = CreateClient(); } private ElasticClient CreateClient() { var connectionSettings = new ConnectionSettings(new Uri(ElasticSearchHost)) .BasicAuthentication(ConfigurationManager.AppSettings["ElasticSearchUser"], ConfigurationManager.AppSettings["ElasticSearchPassword"]) .DisablePing() .DisableDirectStreaming(true) .SniffOnStartup(false) .SniffOnConnectionFault(false); return new ElasticClient(connectionSettings); } public ElasticClient ElasticClient { get; } public string ElasticSearchHost { get; set; } private bool _disposedValue; ~ElasticClientProvider() => Dispose(false); // Public implementation of Dispose pattern callable by consumers. public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } // Protected implementation of Dispose pattern. protected virtual void Dispose(bool disposing) { if (!_disposedValue) { if (disposing) { // TODO: dispose managed state (managed objects) } // TODO: free unmanaged resources (unmanaged objects) and override finalizer // TODO: set large fields to null _disposedValue = true; } } } } |
- CheckExistsAndInsertLog(): Amaç gönderilen <T> tipindeki LogModel’in, ElasticSearch’e yoksa ilgili Index yaratılarak saklanmasıdır.
- ElasticClientProvider Scoped olarak oluşturulur.
- _client.Indices.Exist(): İlgili index’ın var olup olmadığına bakılır.
- var newIndexName = indexName + System.DateTime.Now.Ticks: İlgili index yok ise, Unique bir isim verilir.
- indexSettings.NumberOfReplicas = 1 : Herbir sunucunun, güvenlik amaçlı bir de yedeği olacaktır. Biri çöker ise, yedeyi devreye girecektir.
- indexSettings.NumberOfShards = 3: 3 sharding ve 1 replika ile toplamda 6 sunucu bulunmaktadır. Bir diğer dikkat edilecek konu da, her bir Shard’ın Replicasının başka bir Node’da bulunması gerektiğidir. Bütün yumurtaları aynı sepete koymanın anlamı yoktur :)
-
.Aliases(a => a.Alias(indexName))) : ElasticIndex’e yaratılırken Alias vermenizi şiddetle tavsiye ederim. Alies verildiği takdirde, indexde olan bir değişiklikten dolayı yeni bir index yaratılıp eskisi kolaylıkla silinebilir.
- SearchChat(): “chatredis_log” adındaki Elastic Index’i içinden, son “rowCount” olarak tanımlanan paramterik sayıdaki ChatModel, kaydedilme tarihine göre tersten sıralanarak geriye dönülür.
ElasticCore/ElasticCoreService.cs: Burada konuşma sırasında geçen ChatModel loglama amacı ile ElasticSearch’e, ilgili index yok ise yaratılıp atılmaktadır. Ayrıca history göstermek amacı ile, son 5 konuşma ElasticSearch’den çekilerek consola basılmaktadır.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 |
using Nest; using System; using System.Collections.Generic; using System.Configuration; using System.Linq; using System.Text; using System.Threading.Tasks; namespace ElasticCore { public class ElasticCoreService<T> : IDisposable, IElasticCoreService<T> where T : class { //Elastic üzerinde Indexden bağımız Document atmaya yarar. Yoksa Index yaratır. public void CheckExistsAndInsertLog(T logModel, string indexName) { using (ElasticClientProvider provider = new ElasticClientProvider()) { ElasticClient _client = provider.ElasticClient; if (!_client.Indices.Exists(indexName).Exists) { var newIndexName = indexName + System.DateTime.Now.Ticks; var indexSettings = new IndexSettings(); indexSettings.NumberOfReplicas = 1; indexSettings.NumberOfShards = 3; var response = _client.Indices.Create(newIndexName, index => index.Map<T>(m => m.AutoMap() ) .InitializeUsing(new IndexState() { Settings = indexSettings }) .Aliases(a => a.Alias(indexName))); } IndexResponse responseIndex = _client.Index<T>(logModel, idx => idx.Index(indexName)); } } public IReadOnlyCollection<ChatModel> SearchChatLog(int rowCount) { using (ElasticClientProvider provider = new ElasticClientProvider()) { ElasticClient _client = provider.ElasticClient; string indexName = ConfigurationManager.AppSettings["ElasticIndexName"]; var response = _client.Search<ChatModel>(s => s .Size(rowCount) .Sort(ss => ss.Descending(p => p.PostDate)) .Index(indexName) ); return response.Documents; } } private bool _disposedValue; ~ElasticCoreService() => Dispose(false); // Public implementation of Dispose pattern callable by consumers. public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } // Protected implementation of Dispose pattern. protected virtual void Dispose(bool disposing) { if (!_disposedValue) { if (disposing) { // TODO: dispose managed state (managed objects) } // TODO: free unmanaged resources (unmanaged objects) and override finalizer // TODO: set large fields to null _disposedValue = true; } } } } |
Şimdi sıra geldi, yazdığımız Console Applicationlarına, ElasticSearch entegrasyonunun yapılmasına:
-
“ElasticCoreService<ChatModel> elastic”: ElasticCoreService “ChatModel” tipinde bir Index ile Scoped olarak oluşturulur.
-
“var chats = elastic.SearchChatLog(5)”: Son 5 “ChatModel” datası ElasticSearch’den çekilir.
-
“foreach (var chat in chats)”: Çekilen chat logu consola basılır.
- Mesaj olarak karşı tarafa atılan ChatModel, loglanma amaçlı ElasticSearch’e de atılır.
Aşağıda Kibana’da(ElasticSearch Monitor Tool’u), son 15 dakikadır ElasticSearch’ün “chatredis” indexine atılan mesajlaşma logları listelenmektedir.
Tüm bu Elasticsearch implementasyonu, diğer console uygulaması için de aynen geçerlidir.
RedisPubSubChat/Program.cs:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
using ElasticCore; using System; using System.Configuration; using System.Threading.Tasks; namespace RedisPubSubChat { internal class Program { static async Task Main(string[] args) { using (RedisClient client = new RedisClient()) using (ElasticCoreService<ChatModel> elastic = new ElasticCoreService<ChatModel>()) { //Get Chat History From Elastic var chats = elastic.SearchChatLog(5); Console.WriteLine("TOP 5 Message History:"); Console.WriteLine("".PadRight(60, '*')); foreach (var chat in chats) { Console.WriteLine($"-{chat.From}({chat.PostDate}): {chat.Message}"); } Console.WriteLine("".PadRight(60, '*')); Console.WriteLine(); //--------------------------- var pubSub = client.GetSubscriber(); bool isStay = true; while (isStay) { await pubSub.SubscribeAsync(client.Channel2, (cannel, message) => { Console.WriteLine(Environment.NewLine + "Elie: " + message); Console.Write("Write Message : "); var message2 = Console.ReadLine(); isStay = message2.ToLower() != "exit" ? true : false; pubSub.PublishAsync(client.Channel, message2, StackExchange.Redis.CommandFlags.FireAndForget); ChatModel chatModel = new ChatModel() { From = "Elie", To = "Joe", Message = message2, PostDate = DateTime.Now }; elastic.CheckExistsAndInsertLog(chatModel, ConfigurationManager.AppSettings["ElasticIndexName"]); }); Console.Write("Write Message : "); var message = Console.ReadLine(); isStay = message.ToLower() != "exit" ? true : false; await pubSub.PublishAsync(client.Channel, message, StackExchange.Redis.CommandFlags.FireAndForget); ChatModel chatModel = new ChatModel() { From = "Elie", To = "Joe", Message = message, PostDate = DateTime.Now }; elastic.CheckExistsAndInsertLog(chatModel, ConfigurationManager.AppSettings["ElasticIndexName"]); } } } } } |
Geldik bir makalenin daha sonuna. Bu makalede Redis Pub/Sub kullanarak, iki client arasında mesajlaştık. Bu aşamada herbir client, hem Producer hem de Consumer rollerini almıştır. DB üzerindeki yükü almak, hem de istendiğinde konuşmalar arasında istenen bir kelimenin aranmasının maliyetinin DB üzerinde çok fazla olmasından dolayı, Elasticsearch kullanılmıştır. Giden ve Gelen mesajlar FireAndForget, yani herhangi bir doğrulama yapılmadan gönderilmektedir. Bu yöntem hızı arttırmasına rağmen, doğrulama anlamında hiçbir işlemin yapılmadığı anlamına gelmektedir. Böylece Redis’in, sadece “In Memory Database” olmadığı, socket işlemlerinde de kullanıldığı gösterilmiştir. İstenirse DB’nin güncellendiği durumlarda, Redisin de “Write Through” yani anlık olarak güncellenmesi sağlanabilir. Tabi bu durumda, “FireAndForget” yaklaşımı pek de sağlıklı olmayacaktır :)
Yeni bir makalede görüşmek üzere hepinize hoşçakalın.
Source Code: https://github.com/borakasmer/RedisPubSubChat
Elinize sağlık, her zamanki gibi detay atlamadan adım adım her bilgiyi paylaştığınız için teşekkürler
Ben teşekkür ederim Kerem.
İşinize yararsa, ne mutlu bana :)
Cok Bilgilendirici.
Teşekkürler..
Hocam çok teşekkürler, farklı bir bakış açısı kazandırdınız. Emeğinize sağlık
Cok teşekkürler..