.Net Core Üzerinde Kafka

Selamlar,

Bu makalede dağıtık mesajlaşma sistemlerinden Kafkayı, .Net Core üzerinde incelemeye çalışacağız.

Öncelikle Kafka, aynı RabbitMQ‘de olduğu gibi BÜYÜK veri sistemlerinde message(queue) kullanarak datayı toplayıp, diğer sistemlere dağıtmak amacı ile kullanılır. Örneğin Microservicesler’e aktarılacak data, ElasticSerach’de tutulacak loglar ya da data analizinin yapılacağı Hadoop gibi. Uzun lafın kısası büyük data akışının olduğu yapılarda, mesela spotify search altında, anlık yapılan 40bin requestin minimum gecikme ile real-time data akışının sağlanması için biçilmiş kaftandır. Kafka, ilk olarak LinkedIn tarafından 2011 yılında open source olarak Scala ve Java ile geliştirilmeye başlanmıştır. Adını, Jay Kreps tarafından yazar “Franz Kafka”‘dan almıştır. Nedeni, Apache Kafkanın “Yazmak için en uygun sistem olmasıdır.” Ayrıca Kreps, yazar Kafka’nın çalışmalarına hayrandır.

Kodları anlamanız adına, Kafkada kullanılan bazı temel kavramları inceleyelim:

  • Topic : Queue’ya atılacak mesajları bir gurup altında toplamak için kullanılır. RabbitMQ’daki channel’a denk gelmektedir. Kısaca mesajların tutulduğu yerdir. Loglar için “log_data” topic ya da mesajlar için “message” topic gibi.

  • Partition : Topicler, performans amaçlı farklı parçalardan oluşabilirler. Bunu, aynı harddisk’deki partition gibi düşüne bilirsiniz. Her bir gelen mesaj’a otomatik artan ve değişmeyen “ID” değeri verilir. Buna “offset” denir.
  • Replication Factor  : Burada amaç data güvenliği için yedeklemektir. Partition içindeki veriler çoklanarak saklanırlar. Yani başka bir node’da(makinada) saklanır denebilir. Kafka içinde her bir node’a “Broker” denir.
  • Leader: Her bir Node’da tek bir Leader vardır. Diğerleri Follower’dır. Kafkada, tüm okuma ve yazma işlemlerinden Leader sorumludur. Kafkada diğerleri(followerlar) sadece yedekleme amaçlı kullanılırlar. Tüm mesajları sıraya koymak ve bunları diğer followerlara yine aynı sırada göndermek, gene Leader’ın görevlerindendir. Failure durumu için ise bir broker’da herhangi bir Leader hata verir ise, ona bağlı diğer follower’larından bir tanesi Leader seçilir ve hizmet vermeye devam edilir.

Image Source:https://habrastorage.org/webt/ly/hd/ml/lyhdmlstwyv-tf_-ts54gife3cw.png

Yukarıdaki resimde görüldüğü gibi, her bir follower başka bir Broker’daki Leader’a bağlanmıştır. Peki bundaki amaç nedir? Tabi ki amaç tüm yumurtaları aynı sepete koymamak :) Yani tamamen felaket senaryosuna karşı yapılan bir hazılıktır. Eğer ilgili Broker çökerse, Leader’da gidecektir. Fail olan Leader’a bağlı Followerlar da, aynı gemide olsalar idi:) sorun çok daha büyük olabilirdi. Ama yukarıdaki yapıda farklı Brokerlardaki followerlardan biri, bozulan Leader’ın yerini alacaktır.

  • Producer : Topiclere mesaj gönderen yapılardır. RabbitMQ’daki channel.BasicPublish() gibi düşünülebilir.
  • Consumer : Topicleri dinleyen yapılardır. RabbitMQ’daki consumer ile aynı mantıktadır.

Şimdi sıra geldi kuruluma.

Kurulum: 

1-) Siz bu makaleyi okurkenki en son güncel Kafya 2.0.1 versiyonu :) şu link’den  indirilir.

2-) Yukarıda görüldüğü gibi, Kafka’nın kendi sitesinde de açıkça belirtiği gibi Zookeeper‘a ihtiyaç duymaktadır. Peki bu Zookeper nedir?

Zookeeper: Dağıtık uygulamarı geliştirmeye yarayan, bir koordinasyon merkezi gibi düşünülebilir. Kafka da, dağıtık bir sistem üzerinde çalıştığı için, çeşitli Konfigurasyon bilgilerini bu Zookeeper üzerinde saklamakta ve ona ihtiyaç duymaktadır. Kafka Zookeeper’da ne gibi konfigürasyon datalarını tuttuğunu merak edenler için, bu link’i de buraya iliştireyim :)

Öncelikle Zookeeper’ın kurulması gerekmektedir. Ama ayrıca Zookeeper’ın indirlemsine gerek yoktur. Çünkü Kafka indirildiği zaman Zookeeper’da onunla birlikte gelmektedir. O zaman gelin önce, Zookeeper’ı aşağıdaki komut ile ayağa kaldıralım.

Zookeeper ayağa kaldırıldıktan sonra, aşağıdaki gibi mesaj silsilesinin alınması gerekmektedir.

3-) Kafka’nın ayağa kaldırılması, aşağıdaki gibi bir komut ile gerçekleştirilir.

Kafka ayağa kaldırıldığı zaman, aşağıdaki gibi bir mesajın alınması gerekmektedir.

Kurulum işleri bittiğine göre artık, örnek amaçlı kodlara geçebiliriz. Kodlar .Net Core 2.1.302 üzerinde yazılmıştır. Kafka 9092 portundan ayağa kalkmaktadır. Eğer aşağıdaki gibi ilgili komut bash’de çalıştırılır ise “lsof -i : 9092” ilgili portun kullanılmaya başlandığı görülür.

Öncelikle Console bir .Net Core projesi, aşağıdaki gibi oluşturulur.

İlgili projeye Kafka kütüphanesi, aşağıdaki komut ile kurulur. NET Core için Confluent Kafka bize ünlü C++ ile non-JVM uygulamalar için yazılan librdkafka’ yı sunar. Örneğin C++, C#, Python ve Node gibi.

Bu ilk Console uygulamasında, mesaj gönderen Producer kısmı yazılacaktır.

Aşağıda görüldüğü gibi :

  • “var config = new Dictionary<string, object> { { “bootstrap.servers”, “localhost:9092″ } }” : Öncelikle Kafka configuration’ı yapılarak, “9092” portuna bağlantı yapılmıştır.
  • “using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))” : Mesaj gönderecek ilgili producer oluşturulur.
  • “while (text != “quit”)” : Console’da “quit” yazana kadar işleme devam edilir.
  • “producer.ProduceAsync(“message”, null, text)” : Console’da yazılan string message, Kafkaya’da “message” topic’ing gönderilir. Ve bu işlem, her yeni yazılan mesaj için tekrarlanır.

Kafka/Program.cs :

Sıra geldi ilgili message, Topic’i dinleyen Consumer’ı yazmaya. Aşadağıdaki komut ile yeni bir .Net Core Console Application oluşturulur.

Yine ilgili kafka kütüphaneleri aşağıdaki komut ile bu proje için de indirilir.

kafkaConsumer/Program.cs : Bu projede, Kafka “message” Topic consume edilmekte yani dinlenmektedir.

  • Oluşturulacak consumer’ın config’i tanımlanır.
    • “group.id”, “messageConsumer”: Consumer’ın unique ID’sidir.
    • “bootstrap.servers”, “localhost:9092” : Erişilecek server localdeki 9092 portudur.
    • “enable.auto.commit”, “false” : Bir consumer işlem yaparken çöker ise, işleme kaldığı offsetden başlayacak ve hiçbir mesaj kaybolmayacaktır. Bunun için offset manuel commitlenmelidir.
  • “using (var consumer = new Consumer<Null, string>(config, null, new StringDeserializer(Encoding.UTF8)))”  : Consumer tanımlanan configuration’a gore oluşturulur.
  • “consumer.OnMessage += (_, msg) =>” : “message” Topic’den bir mesaj alındığında ekrana aşağıdaki bilgileri basar :
    • “Console.WriteLine($”Message: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}”)” :
      • msg.Topic: Gelen mesajın topic’i.
      • msg.Partition : Broker içindeki partion ID’si.
      • msg.Offset : İlgili Partition’daki kaçıncı offset ID’li kayıt olduğunu.
      • msg.Value : Gönderilen mesajı.
  • “while (true) { consumer.Poll(100); }” : 100 milisaniye aralıklar ile ilgili “message” Topic dinlenir.

Geldik bir makalenin daha sonuna. Bu makalede özellikle java platformunda sıkça kullanılan, dağıtık mesajlaşma sistemlerinden Kafkayı .Net Core üzerinde inceledik.

Yeni makalede görüşmek üzere hepinize hoşçakalın.

Kaynaklar :

Herkes Görsün:

Bunlar da hoşunuza gidebilir...

9 Cevaplar

  1. Samet dedi ki:

    Güzel konulara değiniyorsunuz hocam, teşekkürler.

  2. alper dedi ki:

    elinize sağlık. örnekte birden fazla consumer ile aynı groyp-id ile scale örneği pek güzel olacağını düşünüyorum.

  3. Mustafa dedi ki:

    Kafka ile uğraşmıştım zamanında mesaj gönder,mesaj al vs işlemleri. 3-4 terminal üzerinde çalıştırmıştım :) .NET Core üzerinde böyle bir örnek yapmanız gayet güzel olmuş elinize sağlık.

  4. Ercan dedi ki:

    Biraz zaman geçmiş siz bunu yazalı Bora hocam ama yine de teşekkür etmek istedim :)
    Elinize sağlık.

  5. Mehmet dedi ki:

    Hocam makale üzerinden baya geçmiş ama :) monitoring için tool kullanıyor musunuz kullanıyorsanız hangi toolu neden kullanıyorsunuz ?

borsoft için bir cevap yazın Cevabı iptal et

E-posta hesabınız yayımlanmayacak.