如何把数据快速批量添加到Elasticsearch中

问题来源

最近新做一个项目,有部分搜索比较频繁的数据,而且量级比较大,预计一两年时间很可能达到100G,项目要求不要存在数据库中,最终出来有两个方案,一个是使用Protocol Buffers存储在文件上,另外就是存在Elasticsearch中,也方便搜索,但这两个方案需要验证,到底哪个方案好,从存储速度,搜索响应,占用空间方面做对比,而我负责给出Elasticsearch的部分技术建议!

验证需求

1、数据量:初步只算52亿条

2、写数据速度:需要超过1W条每秒

遇到问题以及解决办法

而在验证过程中遇到了无论是使用Elasticsearch.Net或者PlainElastic.Net来写数据,并且是使用了Bulk的api,加上多线程,都是太慢了,粗略算了一下,大概一秒插入3千条左右,这样的话,52亿条数据,得插到何年何月啊,太慢了,根据查阅资料,网上也有人说插入数据还是挺快 的,一秒可以插入18w条,但具体也没说是用什么办法插入的,所以只能到官方看看了,发现用REST API的_bulk来批量插入,这样速度明显快了,可以达到5到10w条每秒,速度还可以,但问题是这方法是先定义一定格式的json文件,然后再用curl命令去执行Elasticsearch的_bulk来批量插入,所以得把数据写进json文件,然后再通过批处理,执行文件插入数据,另外在生成json文件,文件不能过大,过大会报错,所以建议生成10M一个文件,然后分别去执行这些小文件就可以了,说了这么多都是文字,真的有点晕乎乎的,看图吧!

json数据文件内容的定义

{"index":{"_index":"meterdata","_type":"autoData"}}
{"Mfid ":1,"TData":172170,"TMoney":209,"HTime":"2016-05-17T08:03:00"}
{"index":{"_index":"meterdata","_type":"autoData"}}
{"Mfid ":1,"TData":172170,"TMoney":209,"HTime":"2016-05-17T08:04:00"}
{"index":{"_index":"meterdata","_type":"autoData"}}
{"Mfid ":1,"TData":172170,"TMoney":209,"HTime":"2016-05-17T08:05:00"}
{"index":{"_index":"meterdata","_type":"autoData"}}
{"Mfid ":1,"TData":172170,"TMoney":209,"HTime":"2016-05-17T08:06:00"}
{"index":{"_index":"meterdata","_type":"autoData"}}
{"Mfid ":1,"TData":172170,"TMoney":209,"HTime":"2016-05-17T08:07:00"}

 

批处理内容的定义

cd E:\curl-7.50.3-win64-mingw\bin
curl 172.17.1.15:9200/_bulk?pretty --data-binary @E:\Bin\Debug\testdata\437714060.json
curl 172.17.1.15:9200/_bulk?pretty --data-binary @E:\Bin\Debug\testdata\743719428.json
curl 172.17.1.15:9200/_bulk?pretty --data-binary @E:\Bin\Debug\testdata\281679894.json
curl 172.17.1.15:9200/_bulk?pretty --data-binary @E:\Bin\Debug\testdata\146257480.json
curl 172.17.1.15:9200/_bulk?pretty --data-binary @E:\Bin\Debug\testdata\892018760.json
pause

 

工具代码

 1      private void button1_Click(object sender, EventArgs e)
 2         {
 3             //Application.StartupPath + "\\" + NextFile.Name
 4             Task.Run(() => { CreateDataToFile(); });
 5         }
 6         public void CreateDataToFile()
 7         {
 8             StringBuilder sb = new StringBuilder();
 9             StringBuilder sborder = new StringBuilder();
10             int flag = 1;
11             sborder.Append(@"cd E:\curl-7.50.3-win64-mingw\bin" + Environment.NewLine);
12             DateTime endDate = DateTime.Parse("2016-10-22");
13             for (int i = 1; i <= 10000; i++)//1w个点
14             {
15                 DateTime startDate = DateTime.Parse("2016-10-22").AddYears(-1);
16                 this.Invoke(new Action(() => { label1.Text = "生成第" + i + "个"; }));
17
18                 while (startDate <= endDate)//每个点生成一年数据,每分钟一条
19                 {
20                     if (flag > 100000)//大于10w分割一个文件
21                     {
22                         string filename = new Random(GetRandomSeed()).Next(900000000) + ".json";
23
24                         FileStream fs3 = new FileStream(Application.StartupPath + "\\testdata\\" + filename, FileMode.OpenOrCreate);
25                         StreamWriter sw = new StreamWriter(fs3, Encoding.GetEncoding("GBK"));
26                         sw.WriteLine(sb.ToString());
27                         sw.Close();
28                         fs3.Close();
29                         sb.Clear();
30                         flag = 1;
31                         sborder.Append(@"curl 172.17.1.15:9200/_bulk?pretty --data-binary @E:\Bin\Debug\testdata\" + filename + Environment.NewLine);
32
33                     }
34                     else
35                     {
36                         sb.Append("{\"index\":{\"_index\":\"meterdata\",\"_type\":\"autoData\"}}" + Environment.NewLine);
37                         sb.Append("{\"Mfid \":" + i + ",\"TData\":" + new Random().Next(1067500) + ",\"TMoney\":" + new Random().Next(1300) + ",\"HTime\":\"" + startDate.ToString("yyyy-MM-ddTHH:mm:ss") + "\"}" + Environment.NewLine);
38                         flag++;
39                     }
40                     startDate = startDate.AddMinutes(1);//
41                 }
42
43             }
44             sborder.Append("pause");
45             FileStream fs1 = new FileStream(Application.StartupPath + "\\testdata\\order.bat", FileMode.OpenOrCreate);
46             StreamWriter sw1 = new StreamWriter(fs1, Encoding.GetEncoding("GBK"));
47             sw1.WriteLine(sborder.ToString());
48             sw1.Close();
49             fs1.Close();
50             MessageBox.Show("生成完毕");
51
52         }
53         static int GetRandomSeed()
54         {//随机生成不重复的编号
55             byte[] bytes = new byte[4];
56             System.Security.Cryptography.RNGCryptoServiceProvider rng = new System.Security.Cryptography.RNGCryptoServiceProvider();
57             rng.GetBytes(bytes);
58             return BitConverter.ToInt32(bytes, 0);
59         }

总结

本次测试结果,发现Elasticsearch的搜索速度是挺快的,生成过程中,在17亿数据时查了一下,根据Mid和时间在几个月范围的数据,查十条数据两秒多完成查询,而且同一查询条件查询越多,查询就越快,应该是Elasticsearch缓存了,52亿条数据,大概占用500G空间左右,还是挺大的,相比Protocol Buffers存储的数据,要大三倍左右,但搜索速度还是比较满意的。

时间: 2024-08-01 10:32:35

如何把数据快速批量添加到Elasticsearch中的相关文章

本质是获得数据,然后添加到页面中,并加入样式

js代码: $("#table").on("mouseover", "tr", function () { var data = this.getAttribute('data-points'); if (!data) return; data = JSON.parse(data); var ht = ''; var i = 0; var parse = function (data) { if (data.children.length) {/

批量添加操作

批量添加原理类似于批量删除,批量添加可以用在Excel文件导入时过滤出导入成功的数据进行批量添加到数据库,只对数据库进行一次访问. 1.mapper.xml层代码 1 <insert id="saveRosters" parameterType="com.example.pojo.Roster"> 2 insert into roster(customer_id, job_number, full_name, sex, mobile, id_type,

java操作elasticsearch实现批量添加数据(bulk)

java操作elasticsearch实现批量添加主要使用了bulk 代码如下: //bulk批量操作(批量添加) @Test public void test7() throws IOException { //1.指定es集群 cluster.name 是固定的key值,my-application是ES集群的名称 Settings settings = Settings.builder().put("cluster.name", "my-application"

Yii2如何批量添加数据

批量添加这个操作,在实际开发中经常用得到,今天小编抽空给大家整理些有关yii2批量添加的问题,感兴趣的朋友一起看看吧. 在上篇文章给大家介绍了关于浅析Yii2 gridview实现批量删除教程,当然,着重点在于怎么去操作gridview了,今儿我们来好好谈谈yii2如何批量添加数据? 有同学嚷嚷了,这还不简单,我foreach一循环,每个循环里面直接把数据插入到数据库,简单粗暴完事!我擦嘞,哥,你要是跟我在一个公司,我觉得第二天见到你的概率可就不大了! 话不多说,说多了你在骂我,我们步入正题,先

ASP.NET MVC用存储过程批量添加修改数据

用Entity Framework 进行数据库交互,在代码里直接用lamda表达式和linq对数据库操作,中间为程序员省去了数据库访问的代码时间,程序员直接可以专注业务逻辑层的编写.但是对于比较复杂的表关系关联查询或者修改就比较费劲了.通常可以采用的方式是用EF执行SQL语句或者"存储过程",特别是执行复杂批量任务,当然也可以在MVC底层用ADO.NET,这里就不多说了.怎么做批量呢?这里讲讲在EF下用存储过程批量添加修改数据. 需求是这样的:需要批量添加修改产品类别的投放任务数额,每

EF批量添加数据性能慢的问题的解决方案

//EF批量添加数据性能慢的问题的解决方案 public ActionResult BatchAdd() { using (var db = new ToneRoad.CEA.DbContext.DbContext()) { //**********************第一种解决方案 直接使用sql********************** string sqls = ""; for (int i = 0; i < 100000; i++) { sqls += "

.Net中批量添加数据的几种实现方法比较

在.Net中经常会遇到批量添加数据,如将Excel中的数据导入数据库,直接在DataGridView控件中添加数据再保存到数据库等等. 方法一:一条一条循环添加 通常我们的第一反应是采用for或foreach循环一条一条的添加. for (int i = 0; i < dgv.Rows.Count; i++) { string sql = "insert into ....."; SqlHelper.ExcuteNonQuery(CommandType.Text, sql, nu

利用DataTable快速批量导数据

DataSet ds = new DataSet();            using (SqlConnection conn = new SqlConnection(@"data source=.\sqlxu;initial catalog=NationalUnion20140717;persist security info=True;user id=sa;password=101;MultipleActiveResultSets=True"))            {    

ADO.NET批量添加数据到SQL Server—BulkCopy使用指南

BulkCopy位于System.Data.SqlClient命名空间,允许你使用其他源的数据有效地批量加载 SQL Server 表. 属性: BatchSize :每个批处理中的行数. 在每个批处理结束时,批处理中的行将发送到服务器. BulkCopyTimeout:超时之前可用于完成操作的秒数. ColumnMappings:返回 SqlBulkCopyColumnMapping 项的集合. 列映射定义数据源中的列和目标中的列之间的关系. DestinationTableName:服务器上