Azure Event Hubs Nedir?
Selamlar,
Bu makalede RabbitMQ’da ya da Redisde olduğu gibi publisher’dan subscriber’a olan, tek yönlü iletişim araçlarında Event Hubs’ı inceleyeceğiz. Event Hubs, ilgili datayı subscriberlarına büyük bir performance ile yayımlar. Ayrıca Kafka ile de entegrasyonu bulunmaktadır.Bu zaman kadar karşılaştığım, gerçek zamanlı veri alım araçlarından en performanslısı Event Hubs diyebilirim.
Azure Event Hubs “Partition Consumer Pattern” kullanmaktadır. Consumerlar datayı, birden fazla partition’dan okuyabilirler. Partitionlar, datayı fiziki bir storage’da günlerce tutabilirler. Yani Partition sayısı arttırıldığı zaman, buna bağlı olarak storage’ın da arttırılması gerekmektedir. Consumerlar da, datayı aldıkları noktayı tutarlar. Yani herhangi bir nedenden dolayı, partitionlardan data akışı kesilir ise, kaldıkları yerden devam edebilirler. Özellike IOT cihazlardan yüklüce veri akışında, Event Hubs çok başarılı çalışmaktadır.
Image Source: http://stimms.files.wordpress.com/2014/08/event-hubs.png
Azure Üzerinde Event Hubs Oluşturma:
- Azure üzerinde menüden Create Resource seçildikten sonra “Event Hubs” seçilerek “Create” butonuna basılır.
- Karşımıza aşağıdaki gibi bir ekran gelir. Event Hubs servisinin adı ve gurubu ve bölgesi seçilerek oluşturulur.
Events Hub’a erişim için gerekli olan Primary key ve Secondry key, aşağıdaki gibi alınır.
Şimdi Gelin bir Event Hub yaratalım:
Events Hub altından “+” tuşuna basılarak Event Hub yaratılır.
Aşağıdaki gibi bir ekran ile karşılaşılır. Create işleminden sonra artık Azure üzerinde Event Hub yaratılmış olunur.
Şimdi sıra geldi, örneğimize. “ExchangeEventHubs” adında bir Console Application aşağıdaki gibi yaratılır. Amaç değişen kur bilgilerini Consumerlar’a bildirmektir.
Nuget’den “WindowsAzure.ServiceBus” aşağıdaki gibi indirilir.
Aşağıda görüldüğü gibi Producer/Publisher 10 random kur değeri, consumer’a gönderilmektedir.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
public static void GenerateRandomMessages() { var primaryConnectionString = "****************************"; var client = EventHubClient.CreateFromConnectionString(primaryConnectionString,"exchangeeventhub"); Random randomNumberGenerator = new Random(); int counter = 100; while (true) { try { counter++; // 10 tane random kur değeri üretilir var randomMessage = string.Format("Son Güncel Kur {0}", 5.4 + (double)(randomNumberGenerator.Next(100, 1000))/1000); Console.WriteLine("Generated message: {0}", randomMessage); client.Send(new EventData(Encoding.UTF8.GetBytes(randomMessage))); if(counter>110) { return; } } catch (Exception exception) { Console.WriteLine(exception.Message); } Thread.Sleep(1000); } Console.ReadLine(); } |
Image Source: https://eax360.com/wp-content/uploads/2019/04/image-7.png
Şimdi sıra geldi, eğer sizde de yok ise Azure üzerinde “Storage” yaratmaya. Event Hubs üzerine gelen datayı, Azure üzerinde ya Table’da ya da Blob üzerinde tutmaktadır. Aşağıda görüldüğü gibi search ekranına “storage” yazılarak, “Storage account” seçilir.
Şimdi sıra geldi Storage Account’un yaratılması için gerekli alanların doldurulmasına. Aşağıda görüldüğü gibi account name olarak “exchangecontainer” atanmıştır.
Daha sonra ilgili storage’a erişim için “Acess keys” tıklandığında, aşağıdaki gibi “key1” ve “key2″‘ye erişilir. Bu keyler uygulamada, erişim için kullanılacaktır.
Sıra geldi Service Storage’dan “Blobs” Blob eklemeye: Aşağıda görüldüğü gibi Blobs sekmesi seçilir.
Ve Container “+” butonuna basılır. Bu uygulamada container ismi olarak “exchangehubcontainer” kullanılmıştır. Ve ilgili Blob Container’ı oluşturulur.
Şimdi sıra geldi gönderdiğimiz değerleri yakalamaya. Yani Consumer Application’ı yazmaya.
Yeni bir “EventHubConsumer” adından Console Application oluşturulur. Aşağıdaki kütüphaneler Nuget’den indirilir.
- Microsoft.Azure.Eventhubs.
- Microsoft.Azure.Eventhubs.Processor.
Processor: Tüm eventlerin dinlendiği, “IEventProcessor” interface’inden türeyen sınıftır. Process’in başlama, kapanma eventleri ve hata duruları nedenleri ile yakalanır. Ayrıca “ProcessEventsAsync” ile kendisine gönderilen data paketi ekrana basılır.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
public class Processor : IEventProcessor { public Task CloseAsync(PartitionContext context, CloseReason reason) { Console.WriteLine($"Process Kapanıyor. Partition '{context.PartitionId}', Sebep: '{reason}'."); return Task.CompletedTask; } public Task OpenAsync(PartitionContext context) { Console.WriteLine($"Processor Ayağa Kalkıyor. Partition: '{context.PartitionId}'"); return Task.CompletedTask; } public Task ProcessErrorAsync(PartitionContext context, Exception error) { Console.WriteLine($"Partition'da Hata Var: {context.PartitionId}, Hata: {error.Message}"); return Task.CompletedTask; } public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages) { foreach (var eventData in messages) { var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count); Console.WriteLine($"Message alındı. Partition: '{context.PartitionId}', Kur Değeri: '{data}'"); } return context.CheckpointAsync(); } } |
Şimdi sıra geldi, yukarıda tanımlı “Processor” sınıfının register edilip, gönderilecek paketlerin dinlenmesine.
En başta Azure üzerindeki Credential, Account ve Container isimleri aşağıdaki gibi tanımlanır:
- EventHubConnectionString
- EventHubName
- StorageContainerName
- StorageAccountName
- StorageAccountKey
- StorageConnectionString
- Asenkron bir MainAsync() methodu tanımlanmıştır.
- “EventProcessorHost()” : Azure üzerindeki EventProcessorHost credentiallar ile tanımlanır.
- “await eventProcessorHost.RegisterEventProcessorAsync<Processor>()”: Mesaj alma işlemini yapacak Consumer Processor sınıfı ile Register edilir.
- Processor sınıfının ProcessEventsAsync() methodu ile gelen mesaj ekrana basılır.
- Uygulama herhangi bir tuşa basılına kadar, dinleme modunda bırakılır.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
private const string EventHubConnectionString = "Endpoint=sb://exchangenotifyeventhub.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=*****************"; private const string EventHubName = "exchangeeventhub"; private const string StorageContainerName = "exchangehubcontainer"; private const string StorageAccountName = "exchangecontainer"; private const string StorageAccountKey = "*****************************"; private static readonly string StorageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", StorageAccountName, StorageAccountKey); private static async Task MainAsync(string[] args) { Console.WriteLine("Processor Ayağa Kalkıyor..."); var eventProcessorHost = new EventProcessorHost( EventHubName, PartitionReceiver.DefaultConsumerGroupName, EventHubConnectionString, StorageConnectionString, StorageContainerName); // Event Processor mesaj almaya başlaması için Host Ediliyor. await eventProcessorHost.RegisterEventProcessorAsync<Processor>(); Console.WriteLine("Yeni paket bekleniyor. Durdurmak için Lütfen Bir Tuşa Basın."); Console.ReadLine(); // Disposes of the Event Processor Host await eventProcessorHost.UnregisterEventProcessorAsync(); } |
MainAsync() methodu aşağıdaki gibi çağrılarak Consumer ayağa kaldırılır.
1 2 3 4 |
static void Main(string[] args) { MainAsync(args).GetAwaiter().GetResult(); } |
Bu makalede genelde Redis veya RabbitMQ ile yaptığım Pub/Sub işlemini bu sefer Event Hubs ile tecrübe ettim ve tek kelime ile performans canavarı olduğunu anladım. Azure üzerinde serverless olarak çalışan, anlık olarak veri almanız gereken durumlardan gözü kapalı tercih edilebilecek çözümlerden biridir.
Yeni bir makalede görüşmek üzere hoşçakalın.
Source Code : https://github.com/borakasmer/ExchangeNotifyEventhubs
Source :
- https://docs.microsoft.com/tr-tr/azure/event-hubs/event-hubs-create
- https://www.c-sharpcorner.com/blogs/azure-event-hub
- https://www.simontimms.com/page/9/
- https://eax360.com/azure-events-hub/
Çok bilgilendirici ve güzel bir paylaşım olmuş.