由于每秒数据吞吐量巨大,需要将实时数据存到event hubs,再由event hubs定时定量保存到document DB。
event hubs的介绍详见微软官页:https://azure.microsoft.com/zh-tw/services/event-hubs/
事件中樞
從網站、應用程式和裝置擷取雲端等級的遙測數據
- 幾乎每秒即時地記錄數百萬個事件
- 使用靈活的授權與節流來連接裝置
- 以時間為基礎處理事件緩衝
- 利用彈性化的規模來管理服務
- 使用原生用戶端程式庫連接廣泛的平台
- 其他雲端服務隨插即用的配接器
每秒串流處理數百萬個事件
Azure 事件中樞是極具調整規模彈性的「發佈-訂閱」服務,其每秒可吸取上百萬項事件,並將這些事件串流至多項應用程式。如此可讓您處理及分析由所連接之裝置與應用程式所產生的大量資料。當事件中樞收集到資料之後,即可利用任何即時分析提供者或以批次/儲存裝置介面卡,來轉換及儲存資料。
處理具變動式負載數據來源的事件
現今的互聯世界即是由巨量資料所構成。巨量資料源自許多變動式負載數據來源,像是每隔幾分鐘即產生遙測資料的互聯車輛與控溫器、每秒產生事件的應用程式效能計數器,以及擷取每位使用者個別動作遙測資料的行動應用程式。事件中樞是一款受管理的服務,其能吸取規模具彈性的事件,以容納這些變動式負載數據來源以及由間歇連接所引發的尖峰情況。
跨平台連接數百萬個裝置
因為所連接的裝置種類成長快速,且涉及各種平台與通訊協定,所以讓吸取來自各種裝置的資料形成相當大的挑戰。而事件中樞不僅能處理各式規模的彙總串流,同時也可迎接這項連接不同資料來源的挑戰。事件中樞可輕鬆佈建容量,處理來自上百萬項裝置的事件,同時也能以裝置為單位維持事件順序。對進階訊息佇列通訊協定 (AMQP) 與 HTTP 所提供的支援,能讓許多平台皆可使用事件中樞。常見的平台也有原生用戶端程式庫可用。
定义event hubs:
private EventHubClientMapping _eventHubClient = new EventHubClientMapping();
保存:
1 protected void SaveDomainUserToFile(DomainUser user, bool isReceive = false) 2 { 3 System.Runtime.Serialization.Json.DataContractJsonSerializer json = 4 new System.Runtime.Serialization.Json.DataContractJsonSerializer(typeof(DomainUser)); 5 6 json.WriteObject(stream, user); 7 stream.Write(_cn, 0, _cn.Length); 8 9 byte[] bs = stream.ToArray(); 10 String contentStr = System.Text.Encoding.UTF8.GetString(bs); 11 //Event Hub 12 try 13 { 14 Notify(contentStr); 15 } 16 catch (Exception ex) 17 { 18 throw ex; 19 } 20 21 StartSave(); 22 }
调用接口方法:
1 public void Notify(string cookie) 2 { 3 _eventHubClient.AddMessage(cookie); 4 }
webconfig中要加入这两句:
1 </configuration> 2 <appSettings> 3 <add key="MappingDataFlowEventHubConnection" value="Endpoint=sb://xxx.servicebus.chinacloudapi.cn/;SharedAccessKeyName=sender;SharedAccessKey=xxx;TransportType=Amqp"/> 4 <add key="MappingDataFlowEventHubName" value="mapping-response-eventhub" /> 5 </appSettings> 6 </configuration>
azure cloud service上的配置:
继承的接口azure core中建立interface:
1 public interface IMappingNotify 2 { 3 void Notify(string cookie); 4 } 5 6 public class MappingNotify 7 { 8 public CookieSource CookieSource { get; set; } 9 10 public string Company { get; set; } 11 12 public string Domain { get; set; } 13 14 public string IP { get; set; } 15 16 public string Lang { get; set; } 17 18 public string PUID { get; set; } 19 20 public string Refer { get; set; } 21 22 public string ScreenHeight { get; set; } 23 24 public string ScreenWidth { get; set; } 25 26 public string TimeStamp { get; set; } 27 28 public string Title { get; set; } 29 30 public string UA { get; set; } 31 32 public string UID { get; set; } 33 34 public string URL { get; set; } 35 36 public string Version { get; set; } 37 38 public static byte[] Serialize(string cookie) 39 { 40 var message = string.Format("{0}", cookie); 41 42 return Encoding.UTF8.GetBytes(message); 43 } 44 45 public static IList<MappingNotify> Deserialize(byte[] message) 46 { 47 if (message == null || message.Length == 0) 48 throw new Exception("message is null."); 49 50 var mns = new List<MappingNotify>(); 51 52 var str = Encoding.UTF8.GetString(message); 53 string[] jsons = str.Trim(‘\n‘).Trim(‘\r‘).Split(new string[] { "\r\n" }, StringSplitOptions.RemoveEmptyEntries); 54 55 foreach (string json in jsons) 56 { 57 var jObject = JObject.Parse(json); 58 59 var mn = new MappingNotify(); 60 61 if (jObject["company"] != null) 62 mn.Company = jObject["company"].ToString(); 63 if (jObject["domain"] != null) 64 mn.Domain = jObject["domain"].ToString(); 65 if (jObject["ip"] != null) 66 mn.IP = jObject["ip"].ToString(); 67 if (jObject["lang"] != null) 68 mn.Lang = jObject["lang"].ToString(); 69 if (jObject["puid"] != null) 70 mn.PUID = jObject["puid"].ToString(); 71 if (jObject["refer"] != null) 72 mn.Refer = jObject["refer"].ToString(); 73 if (jObject["screenHeight"] != null) 74 mn.ScreenHeight = jObject["screenHeight"].ToString(); 75 if (jObject["screenWidth"] != null) 76 mn.ScreenWidth = jObject["screenWidth"].ToString(); 77 if (jObject["timestamp"] != null) 78 mn.TimeStamp = jObject["timestamp"].ToString(); 79 if (jObject["title"] != null) 80 mn.Title = jObject["title"].ToString(); 81 if (jObject["ua"] != null) 82 mn.UA = jObject["ua"].ToString(); 83 if (jObject["uid"] != null) 84 mn.UID = jObject["uid"].ToString(); 85 if (jObject["url"] != null) 86 mn.URL = jObject["url"].ToString(); 87 if (jObject["version"] != null) 88 mn.Version = jObject["version"].ToString(); 89 90 mn.CookieSource = CookieSource.mapping; 91 92 mns.Add(mn); 93 } 94 95 return mns; 96 } 97 }
在azure cloud service上的configuration文件加上这两句:
1 <ConfigurationSettings> 2 <Setting name="MappingDataFlowEventHubConnection" value="Endpoint=sb://xxx.servicebus.chinacloudapi.cn/;SharedAccessKeyName=sender;SharedAccessKey=xxx;TransportType=Amqp"/> 3 <Setting name="MappingDataFlowEventHubName" value="mapping-response-eventhub" /> 4 </ConfigurationSettings>
在azure business中加入一个cs文件:
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 using System.ComponentModel.Composition; 7 8 using xxx.Core.Interface; 9 using xxx.Core.Entity; 10 using xxx.Core; 11 using xxx.Core.Client; 12 13 namespace xxx.Business 14 { 15 [Export(typeof(IMappingNotify))] 16 public class EventHubMappingNotify : IMappingNotify 17 { 18 private EventHubClientMapping _eventHubClient = new EventHubClientMapping(); 19 20 public void Notify(string cookie) 21 { 22 _eventHubClient.AddMessage(cookie); 23 } 24 } 25 26 }
在azure core中config.cs文件中加入config信息:
1 //EventHub链接 2 public static string MappingDataFlowEventHubConnection 3 { 4 get 5 { 6 var config = CloudConfigurationManager.GetSetting("MappingDataFlowEventHubConnection"); 7 if (string.IsNullOrEmpty(config)) 8 throw new Exception("MappingDataFlowEventHubConnection hasn‘t configured!"); 9 10 return config; 11 } 12 }
EventHub链接
1 //EventHub存储配置 2 public static string MappingDataFlowEventHubName 3 { 4 get 5 { 6 var config = CloudConfigurationManager.GetSetting("MappingDataFlowEventHubName"); 7 if (string.IsNullOrEmpty(config)) 8 { 9 return "mapping-response-eventhub"; 10 } 11 12 return config; 13 } 14 }
EventHub存储配置
1 //EventHub消费者组 2 public static string MappingDataFlowEventHubConsumer 3 { 4 get 5 { 6 var config = CloudConfigurationManager.GetSetting("MappingDataFlowEventHubConsumer"); 7 if (string.IsNullOrEmpty(config)) 8 throw new Exception("MappingDataFlowEventHubConsumer hasn‘t configured!"); 9 10 return config; 11 } 12 }
EventHub消费者组
在azure core中加入client.cs文件:
1 using System; 2 using System.Collections.Generic; 3 using System.Linq; 4 using System.Text; 5 using System.Threading.Tasks; 6 7 using Microsoft.Azure; 8 using Microsoft.ServiceBus; 9 using Microsoft.ServiceBus.Messaging; 10 11 using xxx.Core.Entity; 12 using xxx.Core; 13 using xxx.Core.Interface; 14 15 namespace xxx.Core.Client 16 { 17 public class EventHubClientMapping 18 { 19 private Microsoft.ServiceBus.Messaging.EventHubClient _eventHubClient; 20 21 public EventHubClientMapping() 22 { 23 var factory = MessagingFactory.CreateFromConnectionString(DANDaasConfig.MappingDataFlowEventHubConnection); 24 _eventHubClient = factory.CreateEventHubClient(DANDaasConfig.MappingDataFlowEventHubName); 25 26 _eventHubClient.RetryPolicy = RetryPolicy.Default; 27 } 28 29 public void AddMessage(string cookie) 30 { 31 var message = new EventData(MappingNotify.Serialize(cookie)); 32 33 _eventHubClient.Send(message); 34 } 35 } 36 37 }
额。。差不多了。。