1. create event hub on azure
2. create a prj , event hub sender, install nuget pkg - azure service bus
3. check connection string
4. sender sample code
static void Main(string[] args) { Console.WriteLine("Press Ctrl-C to stop the sender process"); Console.WriteLine("Press Enter to start now"); Console.ReadLine(); SendingRandomMessages(); } static string eventHubName = "get from event hub connection information"; static string connectionString = "get from event hub connection information"; static void SendingRandomMessages() { var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName); while (true) { try { var message = Guid.NewGuid().ToString(); Console.WriteLine("{0} > Sending message: {1}", DateTime.Now, message); eventHubClient.Send(new EventData(Encoding.UTF8.GetBytes(message))); } catch (Exception exception) { Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine("{0} > Exception: {1}", DateTime.Now, exception.Message); Console.ResetColor(); } Thread.Sleep(200); } }
5. create a storage account
6. install nuget for receiver prj
7. check keys
8. sample code for receiver
class SimpleEventProcessor : IEventProcessor { Stopwatch checkpointStopWatch; async Task IEventProcessor.CloseAsync(PartitionContext context, CloseReason reason) { Console.WriteLine("Processor Shutting Down. Partition ‘{0}‘, Reason: ‘{1}‘.", context.Lease.PartitionId, reason); if (reason == CloseReason.Shutdown) { await context.CheckpointAsync(); } } Task IEventProcessor.OpenAsync(PartitionContext context) { Console.WriteLine("SimpleEventProcessor initialized. Partition: ‘{0}‘, Offset: ‘{1}‘", context.Lease.PartitionId, context.Lease.Offset); this.checkpointStopWatch = new Stopwatch(); this.checkpointStopWatch.Start(); return Task.FromResult<object>(null); } async Task IEventProcessor.ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages) { foreach (EventData eventData in messages) { string data = Encoding.UTF8.GetString(eventData.GetBytes()); Console.WriteLine(string.Format("Message received. Partition: ‘{0}‘, Data: ‘{1}‘", context.Lease.PartitionId, data)); } //Call checkpoint every 5 minutes, so that worker can resume processing from 5 minutes back if it restarts. if (this.checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(5)) { await context.CheckpointAsync(); this.checkpointStopWatch.Restart(); } } } static void Main(string[] args) { string eventHubConnectionString = "get from azure event hub connection information"; string eventHubName = "get from azure event hub connection information"; string storageAccountName = "get from azure storage keys"; string storageAccountKey = "get from azure storage keys"; string storageConnectionString = string.Format("DefaultEndpointsProtocol=https;AccountName={0};AccountKey={1}", storageAccountName, storageAccountKey); string eventProcessorHostName = Guid.NewGuid().ToString(); EventProcessorHost eventProcessorHost = new EventProcessorHost(eventProcessorHostName, eventHubName, EventHubConsumerGroup.DefaultGroupName, eventHubConnectionString, storageConnectionString); Console.WriteLine("Registering EventProcessor..."); var options = new EventProcessorOptions(); options.ExceptionReceived += (sender, e) => { Console.WriteLine(e.Exception); }; eventProcessorHost.RegisterEventProcessorAsync<SimpleEventProcessor>(options).Wait(); Console.WriteLine("Receiving. Press enter key to stop worker."); Console.ReadLine(); eventProcessorHost.UnregisterEventProcessorAsync().Wait(); }
时间: 2024-10-26 21:51:11