官方提供的是java sdk,并支持jdbc方式的查询结果输出;但是却没有.net sdk的支持。
开发 ElasticSearch-Sql 第三方开源项目的.net sdk,未来集成入bsf框架。(已使用,但未进行大范围使用测试,欢迎交流并抛砖引玉)
参考ElasticSearch-Sql 开源地址:https://github.com/NLPchina/elasticsearch-sql
1)支持将查询结果转换成datatable形式,便于界面绑定和数据导出等。
2)代码简单易懂,便于改进并提高稳定性和性能。(拷贝立即使用)
1 using System; 2 using System.Collections.Generic; 3 using System.Data; 4 using System.Linq; 5 using System.Net.Http; 6 using System.Net.Http.Headers; 7 using System.Reflection; 8 using System.Text; 9 using System.Threading.Tasks; 10 11 namespace BSF.ElasticSearch 12 { 13 public class EsSqlProvider 14 { 15 private string url; 16 private HttpClient client = new HttpClient(); 17 public EsSqlProvider(string ip,int port) 18 { 19 this.url = string.Format("http://{0}:{1}/_sql",ip,port); 20 } 21 22 public EsSqlProvider(string url) 23 { 24 this.url = url.TrimEnd(‘/‘)+"/_sql"; 25 } 26 27 public Result Post(string sql) 28 { 29 //var http = new BSF.Api.HttpProvider(); 30 //var dic = new Dictionary<string, string>(); 31 //dic.Add("",sql); 32 33 var content = new StringContent(sql); 34 content.Headers.ContentType = new MediaTypeHeaderValue("application/json"); 35 var json = PostBase(content); 36 37 return new BSF.Serialization.JsonProvider().Deserialize<Result>(json); 38 39 } 40 41 public DataTable Query(string sql) 42 { 43 var result = Post(sql); 44 if (result.aggregations != null && result.aggregations.Count > 0) 45 { 46 var first = result.aggregations.First(); 47 //groupby 统计方式 48 if (first.Value != null && first.Value.buckets != null && first.Value.buckets.Count > 0) 49 { 50 DataTable dataTable = new DataTable("es-groupby"); 51 //以第一列为准,填充列 52 { 53 var t = first.Value.buckets[0]; 54 dataTable.Columns.Add(new DataColumn(first.Key, ColumnType((GetValue((object)t["key"])).GetType()))); 55 foreach (var c in t) 56 { 57 if (c.Key != "doc_count"&&c.Key!= "key") 58 { dataTable.Columns.Add(new DataColumn(c.Key, (GetValue(c.Value)).GetType())); } 59 } 60 } 61 //填充值 62 foreach (var b in first.Value.buckets) 63 { 64 DataRow dataRow = dataTable.NewRow(); 65 dataRow[first.Key] = b["key"]; 66 foreach (var c in b) 67 { 68 if(dataTable.Columns.Contains(c.Key)) 69 dataRow[c.Key] = GetValue(c.Value); 70 } 71 dataTable.Rows.Add(dataRow); 72 } 73 return dataTable; 74 75 } 76 else if (first.Value != null&& first.Value.value!=null) 77 { 78 DataTable dataTable = new DataTable("es-aggregations"); 79 //常规统计方式 80 foreach (var o in result.aggregations) 81 { 82 dataTable.Columns.Add(new DataColumn(o.Key, ColumnType((GetValue((object)o.Value)).GetType()))); 83 } 84 DataRow dataRow = dataTable.NewRow(); 85 foreach (var o in result.aggregations) 86 { 87 if (dataTable.Columns.Contains(o.Key)) 88 dataRow[o.Key] = GetValue(o.Value); 89 } 90 dataTable.Rows.Add(dataRow); 91 return dataTable; 92 } 93 } 94 else if (result.sources != null && result.sources.Count > 0) 95 { 96 DataTable dataTable = new DataTable("es-sources"); 97 var first = result.sources.First(); 98 foreach (var item in ((Dictionary<string,dynamic>)first)) 99 { 100 dataTable.Columns.Add(new DataColumn(item.Key, ColumnType((GetValue((Object)item.Value)).GetType()))); 101 } 102 foreach (dynamic m in result.sources) 103 { 104 DataRow dataRow = dataTable.NewRow(); 105 foreach (var item in ((Dictionary<string, dynamic>)m)) 106 { 107 if (dataTable.Columns.Contains(item.Key)) 108 dataRow[item.Key] = GetValue(item.Value); 109 } 110 dataTable.Rows.Add(dataRow); 111 } 112 return dataTable; 113 } 114 return null; 115 } 116 117 private Type ColumnType(Type type) 118 { 119 if (type.IsArray) 120 { 121 return typeof(string); 122 } 123 else 124 return type; 125 } 126 127 128 private Object GetValue(Object value) 129 { 130 if (value != null && value is Array) 131 return new BSF.Serialization.JsonProvider().Serializer(value); 132 if (value != null && (value.GetType().IsValueType || value is String)) 133 return value; 134 if (value != null && value is Dictionary<string, dynamic>) 135 { 136 var valuet = (Dictionary<string, dynamic>)value; 137 if( valuet.ContainsKey("value_as_string")) 138 return GetValue(valuet["value_as_string"]); 139 if (valuet.ContainsKey("value")) 140 return GetValue(valuet["value"]); 141 } 142 if (value != null && value.GetType().GetProperty("value")!=null) 143 { 144 var p= value.GetType().GetProperty("value_as_string"); 145 if (p!=null&&p.GetValue(value)!=null) 146 return p.GetValue(value); 147 p = value.GetType().GetProperty("value"); 148 if (p != null && p.GetValue(value) != null) 149 return p.GetValue(value); 150 } 151 return new BSF.Serialization.JsonProvider().Serializer(value); 152 } 153 154 private Object GetPropertyValue(Object obj, string property) 155 { 156 var p = obj.GetType().GetProperty(property); 157 if (p != null) 158 return p.GetValue(obj); 159 return null; 160 } 161 162 private string PostBase(HttpContent content) 163 { 164 //此处未来需要添加HttpClient连接池,复用连接 165 //using (var client = new HttpClient()) 166 //{ 167 var result = client.PostAsync(url, content).Result; 168 string resultContent = result.Content.ReadAsStringAsync().Result; 169 return resultContent; 170 //} 171 } 172 173 public class Result{ 174 175 public Hits hits { get; set; } 176 public Dictionary<string, aggregation> aggregations {get;set;} 177 178 public List<dynamic> sources { get { 179 List<dynamic> rs = new List<dynamic>(); 180 foreach (var h in hits.hits) 181 { 182 rs.Add(h._source); 183 } 184 return rs; 185 } } 186 187 public class aggvalue 188 { 189 public dynamic value { get; set; } 190 public string value_as_string { get; set; } 191 } 192 public class aggregation: aggvalue 193 { 194 public List<Dictionary<string, dynamic>> buckets { get; set; } 195 } 196 public class Hits { 197 public int total { get; set; } 198 public List<hit> hits { get; set; } 199 } 200 public class hit { 201 public string _index { get; set; } 202 public string _type { get; set; } 203 public string _id { get; set; } 204 205 public string _score { get; set; } 206 207 public dynamic _source { get; set; } 208 } 209 210 } 211 } 212 }
时间: 2024-10-22 17:12:14