Redis Pub/Sub ile Console’da Chat ve ElasticSearch İle Loglama

Selamlar,

Producer Consumerlarına Seslenirken

Bu yazıda, daha çok cache olarak kullanılan Redis üzerinde, klasik producer(üretici) – consumer(tüketici) senaryosu yerine, 2 tarafın da birbirini dinlediği ve cevapladığı bir chat uygulaması üzerinde konuşacağız. Aslında bir chat uygulaması için, 2 tarafın da hem publisher hem subscriber olması gerekmektedir. Bunun yanına, arada geçen konuşmaları ElasticSearch’de saklayıp, son 5 mesajı başlangıçta konsol üzerinde listeleyeceğiz. Bu yazıya devam etmeden önce, makinanızda Redis yok ise nasıl kurulur ve Redis Nedir gibi sorulara aşina değil iseniz, önce aşağıdaki makaleleri okumanızı tavsiye ederim:

Bu solution’da, 4 Proje oluşturacağız. İlki ClassLibrary RedisCore projesi, diğer 2 si de Console Application Redis Producer ve Redis Consumer projeleri ve son olarak ElasticCore projesi.

İlk projemiz RedisCore Projesinden başlayalım.

RedisCore/RedisCore.cs: Burada amaç, Redis sunucusuna bağlanacak ve Global olarak kullanılacak bir Redis Client’ın oluşturulmasıdır. Öncelikle Nuget’den, aşağıda görüldüğü gibi “StackExchange” ve “ConfigurationManager” kütüphaneleri indirilir.

  • Memory’den tasarruf amaçlı, iş bitiminde GC’un doğrudan çağrılabilmesi için “Using()” içerisinde kullanılmıştır. Bu nedenle ilgili sınıf, IDisposable interface’inden türetilmiştir. Ve “Dispose()” methodu override edilmiştir.

  • 2 farklı channel tanımlanmıştır “Joe” ve “Elie”. Chat’e katılacak herbir kişi için, ayrı bir channel tanımlanabilir.

  • RedisClient() methodu ile, ilgili connectionString (sunucu yolu) ve Password ile Redis Server’a bağlanılır.

  • İlgili RedisSubscriber GetSubscirber() methodu ile geri dönülür.

RedisCore/RedisCore.cs:

RedisPubSub/App.conf: Redis ve ElasticSearch için gerekli Url, User ve Şifre bilgileri aşağıdaki gibi config dosyasında saklanır.

Not: “app.config”‘de saklanan kritik verilerin, güvenliği amacı ile Encrypted olmasına dikkat ediniz! Bu makalede, güvenlik konusu es geçilmiştir.

RedisPubSubChat/Program.cs: Bu console uygulamasının iki rolü vardır. Hem producer, hem de consumer. Kısaca hem mesaj alan, hem de mesaj yollayan mekanikleri tanımlayacağız.

  • RedisCore sınıfından redis subscriber alınır, ve sonsuz “while” döngüsü içerisinde ilgili channel dinlemeye başlanılır.

  • İşte konsol uygulamamızın ilk Rolü Subscriber. “SubscriberAsync()” ile Channel2 (“Elie”) sürekli olarak dinlenir. Eğer yeni bir mesaj gelir ise Console’a yazılır..

  • Eğer ilgili channel’a bir mesaj gelir ise gelen mesajın hemen altına yeni bir mesaj girilecek olan alan açılır. İşte Console uygulamamızın 2. Rolü Publisher. “PublishAsync()” ile Channel(“Joe”)’a istenen mesaj atılır. CommandFlags.FireAndForget ile, işlemin başarılı olup olunmadığına bakılmaksızın kod devam ettirilir.

  • Subscriber’ın dışındaki mesaj yazma özelliği, gene bir Publisher Rolüdür. Burada amaç yukarıdakinden farklı olarak, sayfa ilk açıldığında da mesaj yazabilmemizi sağlar. Yukarıdaki durumda ise, mesaj yazma özelliği yalnızca size bir mesaj geldiği zaman olabilmektedir.

RedisPubSubChat/Program.cs: Kısaca Channel(“Joe”) kanalına yazılıp, Channel2(“Elie”) kanalı aynı zamanda dinlenir. Böylece çift taraflı görev tanımı ile Console Mesaj ekranı yapılmış olunur.

RedisSubscriber/program.cs: Bu Console uygulaması, 2. client için oluşturulmuştur. Yukarıdaki uygulamanın eşleniği yanlızca bu sefer dinlenen kanal Channel(“Joe”), yazılan kanal Channel2(“Elie”)’dir. Yani tam tersi bir durum söz konusudur.

ElasticSearch ile Mesajları Loglama Ve Listeleme:

Öncelikle Elasticsearch hakkında pek bir fikriniz yok ise aşağıdaki makalelere bir bakmanızı tavsiye ediyorum:

Solution üzerinde, ElasticCore adında yeni bir Class Library Proje aşağıdaki gibi yaratılır.

ElasticCore/IElasticCoreService: Öncelikle, elastic ile yapılacak işlerin bir listesinin çıkarılması gerekmektedir. Aşağıdaki interface’e göre <T> tipi ile verilecek herhangi bir Index ya sorgulanıcak “SearchChatLog()“, ya da yok ise ilgili Index yaratılıp ilgili documentler kaydedilecektir. “CheckExistsAndInsertLog()

ElasticCore/ChatModel: Clientlar arası geçecek konuşmaların kaydedileceği model, aşağıda görüldüğü gibi “ChatModel”‘dir.

ElasticCore/ElasticClientProvider.cs:

  • Öncelikle ElasticSearch için Nest kütüphanesi ve ilgili configlerin app.config’den okunması için, ConfigurationManager kütüphaneleri Nuget’den aşağıdaki gibi indirilir.

  • ElasticClient değişkenine, app.config’den alınan elastic host dosyası ile oluşurulan elastic client atanır.

  • BasicAuthentication ile ElasticSearch’e tanımlanmış user ve password ile bağlanılır.
  • DisablePing():  İlk request’den sonra, belirlenen standart sürenin üstünde bir sürede hata fırlatılması sağlanır.
  • DisableDirectStreaming() Bu, Elastic’e request ve response’un ara belleğe alınmasını sağlar ve her iki değerin de sırasıyla “RequestBodyInBytes” ve “ResponseBodyInBytes” propertylerinde çağrılabilmesine imkan verir. Bunu elasticsearch’de hata alındığı zaman, daha detaylı hatayı alabilmek adına eklenmiştir. Memoryde performans kaybına neden olabilir. Sadece ihtiyaç anında kullanılmalıdır.
  • SniffOnStartup(false): İlk connection’ın çekilme anında, havuzun kontrol edilmesini engeller. Amaç performanstır.
  • SniffOnConnectionFault(false): Bağlantı havuzu yeniden beslemeyi destekliyorsa, bir arama başarısız olduğunda ilgili connection havuzundan yeniden denetlenmesini engeller. Amaç yine performanstır.

ElasticCore/ElasticClientProvider.cs:

  • CheckExistsAndInsertLog(): Amaç gönderilen <T> tipindeki LogModel’in, ElasticSearch’e yoksa ilgili Index yaratılarak saklanmasıdır.
    • ElasticClientProvider Scoped olarak oluşturulur.
    • _client.Indices.Exist(): İlgili index’ın var olup olmadığına bakılır.
    • var newIndexName = indexName + System.DateTime.Now.Ticks: İlgili index yok ise, Unique bir isim verilir.
    • indexSettings.NumberOfReplicas = 1 : Herbir sunucunun, güvenlik amaçlı bir de yedeği olacaktır. Biri çöker ise, yedeyi devreye girecektir.
    • indexSettings.NumberOfShards = 3: 3 sharding ve 1 replika ile toplamda 6 sunucu bulunmaktadır.  Bir diğer dikkat edilecek konu da, her bir Shard’ın Replicasının başka bir Node’da bulunması gerektiğidir. Bütün yumurtaları aynı sepete koymanın anlamı yoktur :)
    • .Aliases(a => a.Alias(indexName))) : ElasticIndex’e yaratılırken Alias vermenizi şiddetle tavsiye ederim. Alies verildiği takdirde, indexde olan bir değişiklikten dolayı yeni bir index yaratılıp eskisi kolaylıkla silinebilir.

  • SearchChat(): “chatredis_log” adındaki Elastic Index’i içinden, son “rowCount” olarak tanımlanan paramterik sayıdaki ChatModel, kaydedilme tarihine göre tersten sıralanarak geriye dönülür.

ElasticCore/ElasticCoreService.cs: Burada konuşma sırasında geçen ChatModel loglama amacı ile ElasticSearch’e, ilgili index yok ise yaratılıp atılmaktadır. Ayrıca history göstermek amacı ile, son 5 konuşma ElasticSearch’den çekilerek consola basılmaktadır.

Şimdi sıra geldi, yazdığımız Console Applicationlarına, ElasticSearch entegrasyonunun yapılmasına:

  • “ElasticCoreService<ChatModel> elastic”: ElasticCoreService “ChatModel” tipinde bir Index ile Scoped olarak oluşturulur.

  • var chats = elastic.SearchChatLog(5)”: Son 5 “ChatModel” datası ElasticSearch’den çekilir.

  • “foreach (var chat in chats)”: Çekilen chat logu consola basılır.

  • Mesaj olarak karşı tarafa atılan ChatModel, loglanma amaçlı ElasticSearch’e de atılır.

Aşağıda Kibana’da(ElasticSearch Monitor Tool’u), son 15 dakikadır ElasticSearch’ün “chatredis” indexine atılan mesajlaşma logları listelenmektedir.

Tüm bu Elasticsearch implementasyonu, diğer console uygulaması için de aynen geçerlidir.

RedisPubSubChat/Program.cs:

 

Geldik bir makalenin daha sonuna. Bu makalede Redis Pub/Sub kullanarak, iki client arasında mesajlaştık. Bu aşamada herbir client, hem Producer hem de Consumer rollerini almıştır. DB üzerindeki yükü almak, hem de istendiğinde konuşmalar arasında istenen bir kelimenin aranmasının maliyetinin DB üzerinde çok fazla olmasından dolayı, Elasticsearch kullanılmıştır. Giden ve Gelen mesajlar FireAndForget, yani herhangi bir doğrulama yapılmadan gönderilmektedir. Bu yöntem hızı arttırmasına rağmen, doğrulama anlamında hiçbir işlemin yapılmadığı anlamına gelmektedir. Böylece Redis’in, sadece “In Memory Database” olmadığı, socket işlemlerinde de kullanıldığı gösterilmiştir. İstenirse DB’nin güncellendiği durumlarda, Redisin de “Write Through” yani anlık olarak güncellenmesi sağlanabilir. Tabi bu durumda, “FireAndForget” yaklaşımı pek de sağlıklı olmayacaktır :)

Yeni bir makalede görüşmek üzere hepinize hoşçakalın.

Source Code: https://github.com/borakasmer/RedisPubSubChat

 

Herkes Görsün:

Bunlar da hoşunuza gidebilir...

6 Cevaplar

  1. Kerem Anka dedi ki:

    Elinize sağlık, her zamanki gibi detay atlamadan adım adım her bilgiyi paylaştığınız için teşekkürler

  2. Tarik dedi ki:

    Cok Bilgilendirici.

  3. Tahsin dedi ki:

    Hocam çok teşekkürler, farklı bir bakış açısı kazandırdınız. Emeğinize sağlık

Bir cevap yazın

E-posta hesabınız yayımlanmayacak.